feat: Add weather updater worker and update fetcher for OneCall API

This commit is contained in:
Sosokker 2025-04-02 17:52:29 +07:00
parent 5a5683bf94
commit a4563342b7
3 changed files with 267 additions and 74 deletions

View File

@ -7,16 +7,19 @@ import (
) )
var ( var (
PORT int PORT int
POSTGRES_USER string POSTGRES_USER string
POSTGRES_PASSWORD string POSTGRES_PASSWORD string
POSTGRES_DB string POSTGRES_DB string
DATABASE_URL string DATABASE_URL string
GOOGLE_CLIENT_ID string GOOGLE_CLIENT_ID string
GOOGLE_CLIENT_SECRET string GOOGLE_CLIENT_SECRET string
GOOGLE_REDIRECT_URL string GOOGLE_REDIRECT_URL string
JWT_SECRET_KEY string JWT_SECRET_KEY string
RABBITMQ_URL string RABBITMQ_URL string
OPENWEATHER_API_KEY string
OPENWEATHER_CACHE_TTL string
WEATHER_FETCH_INTERVAL string
) )
func Load() { func Load() {
@ -30,6 +33,9 @@ func Load() {
viper.SetDefault("JWT_SECRET_KEY", "jwt_secret_key") viper.SetDefault("JWT_SECRET_KEY", "jwt_secret_key")
viper.SetDefault("GOOGLE_REDIRECT_URL", "http://localhost:8000/auth/login/google") viper.SetDefault("GOOGLE_REDIRECT_URL", "http://localhost:8000/auth/login/google")
viper.SetDefault("RABBITMQ_URL", "amqp://user:password@localhost:5672/") viper.SetDefault("RABBITMQ_URL", "amqp://user:password@localhost:5672/")
viper.SetDefault("OPENWEATHER_API_KEY", "openweather_api_key")
viper.SetDefault("OPENWEATHER_CACHE_TTL", "15m")
viper.SetDefault("WEATHER_FETCH_INTERVAL", "15m")
viper.SetConfigFile(".env") viper.SetConfigFile(".env")
viper.AddConfigPath("../../.") viper.AddConfigPath("../../.")
@ -50,4 +56,7 @@ func Load() {
GOOGLE_REDIRECT_URL = viper.GetString("GOOGLE_REDIRECT_URL") GOOGLE_REDIRECT_URL = viper.GetString("GOOGLE_REDIRECT_URL")
JWT_SECRET_KEY = viper.GetString("JWT_SECRET_KEY") JWT_SECRET_KEY = viper.GetString("JWT_SECRET_KEY")
RABBITMQ_URL = viper.GetString("RABBITMQ_URL") RABBITMQ_URL = viper.GetString("RABBITMQ_URL")
OPENWEATHER_API_KEY = viper.GetString("OPENWEATHER_API_KEY")
OPENWEATHER_CACHE_TTL = viper.GetString("OPENWEATHER_CACHE_TTL")
WEATHER_FETCH_INTERVAL = viper.GetString("WEATHER_FETCH_INTERVAL")
} }

View File

@ -1,9 +1,11 @@
// backend/internal/services/weather/openweathermap_fetcher.go
package weather package weather
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"log/slog" "log/slog"
"net/http" "net/http"
"net/url" "net/url"
@ -12,54 +14,45 @@ import (
"github.com/forfarm/backend/internal/domain" "github.com/forfarm/backend/internal/domain"
) )
const openWeatherMapAPIURL = "https://api.openweathermap.org/data/2.5/weather" const openWeatherMapOneCallAPIURL = "https://api.openweathermap.org/data/3.0/onecall"
type openWeatherMapResponse struct { type openWeatherMapOneCallResponse struct {
Coord struct { Lat float64 `json:"lat"`
Lon float64 `json:"lon"` Lon float64 `json:"lon"`
Lat float64 `json:"lat"` Timezone string `json:"timezone"`
} `json:"coord"` TimezoneOffset int `json:"timezone_offset"`
Weather []struct { Current *struct {
ID int `json:"id"` Dt int64 `json:"dt"` // Current time, Unix, UTC
Main string `json:"main"` Sunrise int64 `json:"sunrise"`
Description string `json:"description"` Sunset int64 `json:"sunset"`
Icon string `json:"icon"` Temp float64 `json:"temp"` // Kelvin by default, 'units=metric' for Celsius
} `json:"weather"` FeelsLike float64 `json:"feels_like"` // Kelvin by default
Base string `json:"base"` Pressure int `json:"pressure"` // hPa
Main struct { Humidity int `json:"humidity"` // %
Temp float64 `json:"temp"` // Kelvin by default DewPoint float64 `json:"dew_point"`
FeelsLike float64 `json:"feels_like"` // Kelvin by default Uvi float64 `json:"uvi"`
TempMin float64 `json:"temp_min"` // Kelvin by default Clouds int `json:"clouds"` // %
TempMax float64 `json:"temp_max"` // Kelvin by default Visibility int `json:"visibility"` // meters
Pressure int `json:"pressure"` // hPa WindSpeed float64 `json:"wind_speed"` // meter/sec by default
Humidity int `json:"humidity"` // % WindDeg int `json:"wind_deg"`
SeaLevel int `json:"sea_level"` // hPa WindGust float64 `json:"wind_gust,omitempty"`
GrndLevel int `json:"grnd_level"` // hPa Rain *struct {
} `json:"main"` OneH float64 `json:"1h"` // Rain volume for the last 1 hour, mm
Visibility int `json:"visibility"` // meters } `json:"rain,omitempty"`
Wind struct { Snow *struct {
Speed float64 `json:"speed"` // meter/sec OneH float64 `json:"1h"` // Snow volume for the last 1 hour, mm
Deg int `json:"deg"` // degrees (meteorological) } `json:"snow,omitempty"`
Gust float64 `json:"gust"` // meter/sec Weather []struct {
} `json:"wind"` ID int `json:"id"`
Rain struct { Main string `json:"main"`
OneH float64 `json:"1h"` // Rain volume for the last 1 hour, mm Description string `json:"description"`
} `json:"rain"` Icon string `json:"icon"`
Clouds struct { } `json:"weather"`
All int `json:"all"` // % } `json:"current,omitempty"`
} `json:"clouds"` // Minutely []...
Dt int64 `json:"dt"` // Time of data calculation, unix, UTC // Hourly []...
Sys struct { // Daily []...
Type int `json:"type"` // Alerts []...
ID int `json:"id"`
Country string `json:"country"`
Sunrise int64 `json:"sunrise"` // unix, UTC
Sunset int64 `json:"sunset"` // unix, UTC
} `json:"sys"`
Timezone int `json:"timezone"` // Shift in seconds from UTC
ID int `json:"id"` // City ID
Name string `json:"name"` // City name
Cod int `json:"cod"` // Internal parameter
} }
type OpenWeatherMapFetcher struct { type OpenWeatherMapFetcher struct {
@ -87,9 +80,11 @@ func (f *OpenWeatherMapFetcher) GetCurrentWeatherByCoords(ctx context.Context, l
queryParams.Set("lat", fmt.Sprintf("%.4f", lat)) queryParams.Set("lat", fmt.Sprintf("%.4f", lat))
queryParams.Set("lon", fmt.Sprintf("%.4f", lon)) queryParams.Set("lon", fmt.Sprintf("%.4f", lon))
queryParams.Set("appid", f.apiKey) queryParams.Set("appid", f.apiKey)
queryParams.Set("units", "metric") queryParams.Set("units", "metric") // Request Celsius and m/s
queryParams.Set("exclude", "minutely,hourly,daily,alerts") // Exclude parts we don't need now
fullURL := fmt.Sprintf("%s?%s", openWeatherMapAPIURL, queryParams.Encode()) fullURL := fmt.Sprintf("%s?%s", openWeatherMapOneCallAPIURL, queryParams.Encode())
f.logger.Debug("Fetching weather from OpenWeatherMap OneCall API", "url", fullURL)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fullURL, nil) req, err := http.NewRequestWithContext(ctx, http.MethodGet, fullURL, nil)
if err != nil { if err != nil {
@ -105,36 +100,55 @@ func (f *OpenWeatherMapFetcher) GetCurrentWeatherByCoords(ctx context.Context, l
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
f.logger.Error("OpenWeatherMap API returned non-OK status", "url", fullURL, "status_code", resp.StatusCode) // TODO: Read resp.Body to get error message from OpenWeatherMap
bodyBytes, _ := io.ReadAll(resp.Body)
f.logger.Error("OpenWeatherMap API returned non-OK status",
"url", fullURL,
"status_code", resp.StatusCode,
"body", string(bodyBytes))
return nil, fmt.Errorf("weather API request failed with status: %s", resp.Status) return nil, fmt.Errorf("weather API request failed with status: %s", resp.Status)
} }
var owmResp openWeatherMapResponse var owmResp openWeatherMapOneCallResponse
if err := json.NewDecoder(resp.Body).Decode(&owmResp); err != nil { if err := json.NewDecoder(resp.Body).Decode(&owmResp); err != nil {
f.logger.Error("Failed to decode OpenWeatherMap response", "error", err) f.logger.Error("Failed to decode OpenWeatherMap OneCall response", "error", err)
return nil, fmt.Errorf("failed to decode weather response: %w", err) return nil, fmt.Errorf("failed to decode weather response: %w", err)
} }
if len(owmResp.Weather) == 0 { if owmResp.Current == nil {
f.logger.Warn("OpenWeatherMap response missing weather details", "lat", lat, "lon", lon) f.logger.Warn("OpenWeatherMap OneCall response missing 'current' weather data", "lat", lat, "lon", lon)
return nil, fmt.Errorf("current weather data not found in API response")
}
current := owmResp.Current
if len(current.Weather) == 0 {
f.logger.Warn("OpenWeatherMap response missing weather description details", "lat", lat, "lon", lon)
return nil, fmt.Errorf("weather data description not found in response") return nil, fmt.Errorf("weather data description not found in response")
} }
weatherData := &domain.WeatherData{ // Create domain object using pointers for optional fields
Timestamp: time.Unix(owmResp.Dt, 0).UTC(), weatherData := &domain.WeatherData{} // Initialize empty struct first
TempCelsius: owmResp.Main.Temp,
Humidity: float64(owmResp.Main.Humidity), // Assign values using pointers, checking for nil where appropriate
Description: owmResp.Weather[0].Description, weatherData.TempCelsius = &current.Temp
Icon: owmResp.Weather[0].Icon, humidityFloat := float64(current.Humidity)
WindSpeed: owmResp.Wind.Speed, weatherData.Humidity = &humidityFloat
RainVolume1h: owmResp.Rain.OneH, weatherData.Description = &current.Weather[0].Description
weatherData.Icon = &current.Weather[0].Icon
weatherData.WindSpeed = &current.WindSpeed
if current.Rain != nil {
weatherData.RainVolume1h = &current.Rain.OneH
} }
observedTime := time.Unix(current.Dt, 0).UTC()
weatherData.ObservedAt = &observedTime
now := time.Now().UTC()
weatherData.WeatherLastUpdated = &now
f.logger.Debug("Successfully fetched weather data", f.logger.Debug("Successfully fetched weather data",
"lat", lat, "lat", lat,
"lon", lon, "lon", lon,
"temp", weatherData.TempCelsius, "temp", *weatherData.TempCelsius,
"description", weatherData.Description) "description", *weatherData.Description)
return weatherData, nil return weatherData, nil
} }

View File

@ -0,0 +1,170 @@
// backend/internal/workers/weather_updater.go
package workers
import (
"context"
"log/slog"
"sync"
"time"
"github.com/forfarm/backend/internal/domain"
"github.com/google/uuid"
)
type WeatherUpdater struct {
farmRepo domain.FarmRepository
weatherFetcher domain.WeatherFetcher
eventPublisher domain.EventPublisher
logger *slog.Logger
fetchInterval time.Duration
stopChan chan struct{} // Channel to signal stopping
wg sync.WaitGroup
}
func NewWeatherUpdater(
farmRepo domain.FarmRepository,
weatherFetcher domain.WeatherFetcher,
eventPublisher domain.EventPublisher,
logger *slog.Logger,
fetchInterval time.Duration,
) *WeatherUpdater {
if logger == nil {
logger = slog.Default()
}
if fetchInterval <= 0 {
fetchInterval = 15 * time.Minute
}
return &WeatherUpdater{
farmRepo: farmRepo,
weatherFetcher: weatherFetcher,
eventPublisher: eventPublisher,
logger: logger,
fetchInterval: fetchInterval,
stopChan: make(chan struct{}),
}
}
func (w *WeatherUpdater) Start(ctx context.Context) {
w.logger.Info("Starting Weather Updater worker", "interval", w.fetchInterval)
ticker := time.NewTicker(w.fetchInterval)
w.wg.Add(1)
go func() {
defer w.wg.Done()
defer ticker.Stop()
w.logger.Info("Weather Updater goroutine started")
w.fetchAndUpdateAllFarms(ctx)
for {
select {
case <-ticker.C:
w.logger.Info("Weather Updater tick: fetching weather data")
w.fetchAndUpdateAllFarms(ctx)
case <-w.stopChan:
w.logger.Info("Weather Updater received stop signal, stopping...")
return
case <-ctx.Done():
w.logger.Info("Weather Updater context cancelled, stopping...", "reason", ctx.Err())
return
}
}
}()
}
func (w *WeatherUpdater) Stop() {
w.logger.Info("Attempting to stop Weather Updater worker...")
close(w.stopChan)
w.wg.Wait()
w.logger.Info("Weather Updater worker stopped")
}
func (w *WeatherUpdater) fetchAndUpdateAllFarms(ctx context.Context) {
// Use a background context for the repository call if the main context might cancel prematurely
// repoCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) // Example timeout
// defer cancel()
// TODO: Need a GetAllFarms method in the FarmRepository or a way to efficiently get all farm locations.
farms, err := w.farmRepo.GetByOwnerID(ctx, "") // !! REPLACE with a proper GetAll method !!
if err != nil {
w.logger.Error("Failed to get farms for weather update", "error", err)
return
}
if len(farms) == 0 {
w.logger.Info("No farms found to update weather for.")
return
}
w.logger.Info("Found farms for weather update", "count", len(farms))
var fetchWg sync.WaitGroup
fetchCtx, cancelFetches := context.WithCancel(ctx)
defer cancelFetches()
for _, farm := range farms {
if farm.Lat == 0 && farm.Lon == 0 {
w.logger.Warn("Skipping farm with zero coordinates", "farm_id", farm.UUID, "farm_name", farm.Name)
continue
}
fetchWg.Add(1)
go func(f domain.Farm) {
defer fetchWg.Done()
select {
case <-fetchCtx.Done():
return
default:
w.fetchAndPublishWeather(fetchCtx, f)
}
}(farm)
}
fetchWg.Wait()
w.logger.Info("Finished weather fetch cycle for farms", "count", len(farms))
}
func (w *WeatherUpdater) fetchAndPublishWeather(ctx context.Context, farm domain.Farm) {
weatherData, err := w.weatherFetcher.GetCurrentWeatherByCoords(ctx, farm.Lat, farm.Lon)
if err != nil {
w.logger.Error("Failed to fetch weather data", "farm_id", farm.UUID, "lat", farm.Lat, "lon", farm.Lon, "error", err)
return
}
if weatherData == nil {
w.logger.Warn("Received nil weather data without error", "farm_id", farm.UUID)
return
}
payloadMap := map[string]interface{}{
"farm_id": farm.UUID,
"lat": farm.Lat,
"lon": farm.Lon,
"temp_celsius": weatherData.TempCelsius,
"humidity": weatherData.Humidity,
"description": weatherData.Description,
"icon": weatherData.Icon,
"wind_speed": weatherData.WindSpeed,
"rain_volume_1h": weatherData.RainVolume1h,
"observed_at": weatherData.ObservedAt,
"weather_last_updated": weatherData.WeatherLastUpdated,
}
event := domain.Event{
ID: uuid.NewString(),
Type: "weather.updated",
Source: "weather-updater-worker",
Timestamp: time.Now().UTC(),
AggregateID: farm.UUID,
Payload: payloadMap,
}
pubCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = w.eventPublisher.Publish(pubCtx, event)
if err != nil {
w.logger.Error("Failed to publish weather.updated event", "farm_id", farm.UUID, "event_id", event.ID, "error", err)
} else {
w.logger.Debug("Published weather.updated event", "farm_id", farm.UUID, "event_id", event.ID)
}
}