mirror of
https://github.com/ForFarmTeam/ForFarm.git
synced 2025-12-18 13:34:08 +01:00
Merge branch 'main' into feature-knowledge-hub
This commit is contained in:
commit
983c3a77cd
@ -16,6 +16,7 @@ import (
|
||||
"github.com/go-chi/cors"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
|
||||
"github.com/forfarm/backend/internal/cache"
|
||||
"github.com/forfarm/backend/internal/config"
|
||||
"github.com/forfarm/backend/internal/domain"
|
||||
m "github.com/forfarm/backend/internal/middlewares"
|
||||
@ -29,6 +30,7 @@ type api struct {
|
||||
logger *slog.Logger
|
||||
httpClient *http.Client
|
||||
eventPublisher domain.EventPublisher
|
||||
cache cache.Cache
|
||||
|
||||
userRepo domain.UserRepository
|
||||
cropRepo domain.CroplandRepository
|
||||
@ -54,17 +56,21 @@ func NewAPI(
|
||||
pool *pgxpool.Pool,
|
||||
eventPublisher domain.EventPublisher,
|
||||
analyticsRepo domain.AnalyticsRepository,
|
||||
inventoryRepo domain.InventoryRepository,
|
||||
croplandRepo domain.CroplandRepository,
|
||||
farmRepo domain.FarmRepository,
|
||||
) *api {
|
||||
|
||||
client := &http.Client{}
|
||||
|
||||
logger.Info("creating memory cache")
|
||||
memoryCache := cache.NewMemoryCache(1*time.Hour, 2*time.Hour)
|
||||
|
||||
userRepository := repository.NewPostgresUser(pool)
|
||||
plantRepository := repository.NewPostgresPlant(pool)
|
||||
plantRepository := repository.NewPostgresPlant(pool, memoryCache)
|
||||
inventoryRepo := repository.NewPostgresInventory(pool, eventPublisher, memoryCache)
|
||||
harvestRepository := repository.NewPostgresHarvest(pool, memoryCache)
|
||||
knowledgeHubRepository := repository.NewPostgresKnowledgeHub(pool)
|
||||
harvestRepository := repository.NewPostgresHarvest(pool)
|
||||
croplandRepo := repository.NewPostgresCropland(pool)
|
||||
croplandRepo.SetEventPublisher(eventPublisher)
|
||||
|
||||
owmFetcher := weather.NewOpenWeatherMapFetcher(config.OPENWEATHER_API_KEY, client, logger)
|
||||
cacheTTL, err := time.ParseDuration(config.OPENWEATHER_CACHE_TTL)
|
||||
@ -88,6 +94,7 @@ func NewAPI(
|
||||
logger: logger,
|
||||
httpClient: client,
|
||||
eventPublisher: eventPublisher,
|
||||
cache: memoryCache,
|
||||
|
||||
userRepo: userRepository,
|
||||
cropRepo: croplandRepo,
|
||||
|
||||
14
backend/internal/cache/cache.go
vendored
Normal file
14
backend/internal/cache/cache.go
vendored
Normal file
@ -0,0 +1,14 @@
|
||||
package cache
|
||||
|
||||
import "time"
|
||||
|
||||
type Cache interface {
|
||||
Get(key string) (interface{}, bool)
|
||||
Set(key string, value interface{}, ttl time.Duration)
|
||||
Delete(key string)
|
||||
}
|
||||
|
||||
const (
|
||||
DefaultExpiration time.Duration = 0
|
||||
NoExpiration time.Duration = -1
|
||||
)
|
||||
37
backend/internal/cache/memory_cache.go
vendored
Normal file
37
backend/internal/cache/memory_cache.go
vendored
Normal file
@ -0,0 +1,37 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
gocache "github.com/patrickmn/go-cache"
|
||||
)
|
||||
|
||||
type memoryCache struct {
|
||||
client *gocache.Cache
|
||||
}
|
||||
|
||||
func NewMemoryCache(defaultExpiration, cleanupInterval time.Duration) Cache {
|
||||
return &memoryCache{
|
||||
client: gocache.New(defaultExpiration, cleanupInterval),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *memoryCache) Get(key string) (interface{}, bool) {
|
||||
return m.client.Get(key)
|
||||
}
|
||||
|
||||
func (m *memoryCache) Set(key string, value interface{}, ttl time.Duration) {
|
||||
var expiration time.Duration
|
||||
if ttl == DefaultExpiration {
|
||||
expiration = gocache.DefaultExpiration
|
||||
} else if ttl == NoExpiration {
|
||||
expiration = gocache.NoExpiration
|
||||
} else {
|
||||
expiration = ttl
|
||||
}
|
||||
m.client.Set(key, value, expiration)
|
||||
}
|
||||
|
||||
func (m *memoryCache) Delete(key string) {
|
||||
m.client.Delete(key)
|
||||
}
|
||||
@ -55,11 +55,6 @@ func APICmd(ctx context.Context) *cobra.Command {
|
||||
farmRepo := repository.NewPostgresFarm(pool)
|
||||
farmRepo.SetEventPublisher(eventBus)
|
||||
|
||||
inventoryRepo := repository.NewPostgresInventory(pool, eventBus)
|
||||
|
||||
croplandRepo := repository.NewPostgresCropland(pool)
|
||||
croplandRepo.SetEventPublisher(eventBus)
|
||||
|
||||
projection := event.NewFarmAnalyticsProjection(eventBus, analyticsRepo, logger)
|
||||
go func() {
|
||||
if err := projection.Start(ctx); err != nil {
|
||||
@ -68,7 +63,7 @@ func APICmd(ctx context.Context) *cobra.Command {
|
||||
}()
|
||||
logger.Info("Farm Analytics Projection started")
|
||||
|
||||
apiInstance := api.NewAPI(ctx, logger, pool, eventBus, analyticsRepo, inventoryRepo, croplandRepo, farmRepo)
|
||||
apiInstance := api.NewAPI(ctx, logger, pool, eventBus, analyticsRepo, farmRepo)
|
||||
|
||||
weatherFetcher := apiInstance.GetWeatherFetcher()
|
||||
weatherInterval, err := time.ParseDuration(config.WEATHER_FETCH_INTERVAL)
|
||||
|
||||
@ -2,19 +2,34 @@ package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"github.com/forfarm/backend/internal/cache"
|
||||
"github.com/forfarm/backend/internal/domain"
|
||||
)
|
||||
|
||||
const (
|
||||
cacheKeyHarvestUnits = "harvest:units"
|
||||
)
|
||||
|
||||
type postgresHarvestRepository struct {
|
||||
conn Connection
|
||||
conn Connection
|
||||
cache cache.Cache
|
||||
}
|
||||
|
||||
func NewPostgresHarvest(conn Connection) domain.HarvestRepository {
|
||||
return &postgresHarvestRepository{conn: conn}
|
||||
func NewPostgresHarvest(conn Connection, c cache.Cache) domain.HarvestRepository {
|
||||
return &postgresHarvestRepository{conn: conn, cache: c}
|
||||
}
|
||||
|
||||
func (p *postgresHarvestRepository) GetUnits(ctx context.Context) ([]domain.HarvestUnit, error) {
|
||||
if cached, found := p.cache.Get(cacheKeyHarvestUnits); found {
|
||||
if units, ok := cached.([]domain.HarvestUnit); ok {
|
||||
slog.DebugContext(ctx, "Cache hit for GetHarvestUnits", "key", cacheKeyHarvestUnits)
|
||||
return units, nil
|
||||
}
|
||||
}
|
||||
slog.DebugContext(ctx, "Cache miss for GetHarvestUnits", "key", cacheKeyHarvestUnits)
|
||||
|
||||
query := `SELECT id, name FROM harvest_units ORDER BY id`
|
||||
rows, err := p.conn.Query(ctx, query)
|
||||
if err != nil {
|
||||
@ -30,5 +45,13 @@ func (p *postgresHarvestRepository) GetUnits(ctx context.Context) ([]domain.Harv
|
||||
}
|
||||
units = append(units, u)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(units) > 0 {
|
||||
p.cache.Set(cacheKeyHarvestUnits, units, cacheTTLStatic)
|
||||
}
|
||||
|
||||
return units, nil
|
||||
}
|
||||
|
||||
@ -3,20 +3,28 @@ package repository
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/forfarm/backend/internal/cache"
|
||||
"github.com/forfarm/backend/internal/domain"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
cacheKeyInventoryStatuses = "inventory:statuses"
|
||||
cacheKeyInventoryCategories = "inventory:categories"
|
||||
)
|
||||
|
||||
type postgresInventoryRepository struct {
|
||||
conn Connection
|
||||
eventPublisher domain.EventPublisher
|
||||
cache cache.Cache
|
||||
}
|
||||
|
||||
func NewPostgresInventory(conn Connection, publisher domain.EventPublisher) domain.InventoryRepository {
|
||||
return &postgresInventoryRepository{conn: conn, eventPublisher: publisher}
|
||||
func NewPostgresInventory(conn Connection, publisher domain.EventPublisher, c cache.Cache) domain.InventoryRepository {
|
||||
return &postgresInventoryRepository{conn: conn, eventPublisher: publisher, cache: c}
|
||||
}
|
||||
|
||||
func (p *postgresInventoryRepository) fetch(ctx context.Context, query string, args ...interface{}) ([]domain.InventoryItem, error) {
|
||||
@ -342,6 +350,14 @@ func (p *postgresInventoryRepository) Delete(ctx context.Context, id, userID str
|
||||
}
|
||||
|
||||
func (p *postgresInventoryRepository) GetStatuses(ctx context.Context) ([]domain.InventoryStatus, error) {
|
||||
if cached, found := p.cache.Get(cacheKeyInventoryStatuses); found {
|
||||
if statuses, ok := cached.([]domain.InventoryStatus); ok {
|
||||
slog.DebugContext(ctx, "Cache hit for GetInventoryStatuses", "key", cacheKeyInventoryStatuses)
|
||||
return statuses, nil
|
||||
}
|
||||
}
|
||||
slog.DebugContext(ctx, "Cache miss for GetInventoryStatuses", "key", cacheKeyInventoryStatuses)
|
||||
|
||||
query := `SELECT id, name FROM inventory_status ORDER BY id`
|
||||
rows, err := p.conn.Query(ctx, query)
|
||||
if err != nil {
|
||||
@ -357,10 +373,26 @@ func (p *postgresInventoryRepository) GetStatuses(ctx context.Context) ([]domain
|
||||
}
|
||||
statuses = append(statuses, s)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(statuses) > 0 {
|
||||
p.cache.Set(cacheKeyInventoryStatuses, statuses, cacheTTLStatic)
|
||||
}
|
||||
|
||||
return statuses, nil
|
||||
}
|
||||
|
||||
func (p *postgresInventoryRepository) GetCategories(ctx context.Context) ([]domain.InventoryCategory, error) {
|
||||
if cached, found := p.cache.Get(cacheKeyInventoryCategories); found {
|
||||
if categories, ok := cached.([]domain.InventoryCategory); ok {
|
||||
slog.DebugContext(ctx, "Cache hit for GetInventoryCategories", "key", cacheKeyInventoryCategories)
|
||||
return categories, nil
|
||||
}
|
||||
}
|
||||
slog.DebugContext(ctx, "Cache miss for GetInventoryCategories", "key", cacheKeyInventoryCategories)
|
||||
|
||||
query := `SELECT id, name FROM inventory_category ORDER BY id`
|
||||
rows, err := p.conn.Query(ctx, query)
|
||||
if err != nil {
|
||||
@ -376,5 +408,13 @@ func (p *postgresInventoryRepository) GetCategories(ctx context.Context) ([]doma
|
||||
}
|
||||
categories = append(categories, c)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(categories) > 0 {
|
||||
p.cache.Set(cacheKeyInventoryCategories, categories, cacheTTLStatic)
|
||||
}
|
||||
|
||||
return categories, nil
|
||||
}
|
||||
|
||||
@ -2,18 +2,28 @@ package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/forfarm/backend/internal/cache"
|
||||
"github.com/forfarm/backend/internal/domain"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
cacheKeyPlantsAll = "plants:all"
|
||||
cacheKeyPlantPrefix = "plant:uuid:"
|
||||
cacheTTLStatic = 1 * time.Hour // Cache static lists for 1 hour
|
||||
)
|
||||
|
||||
type postgresPlantRepository struct {
|
||||
conn Connection
|
||||
conn Connection
|
||||
cache cache.Cache
|
||||
}
|
||||
|
||||
func NewPostgresPlant(conn Connection) domain.PlantRepository {
|
||||
return &postgresPlantRepository{conn: conn}
|
||||
func NewPostgresPlant(conn Connection, c cache.Cache) domain.PlantRepository {
|
||||
return &postgresPlantRepository{conn: conn, cache: c}
|
||||
}
|
||||
|
||||
func (p *postgresPlantRepository) fetch(ctx context.Context, query string, args ...interface{}) ([]domain.Plant, error) {
|
||||
@ -43,12 +53,27 @@ func (p *postgresPlantRepository) fetch(ctx context.Context, query string, args
|
||||
}
|
||||
|
||||
func (p *postgresPlantRepository) GetByUUID(ctx context.Context, uuid string) (domain.Plant, error) {
|
||||
// Check cache first
|
||||
cacheKey := cacheKeyPlantPrefix + uuid
|
||||
if cached, found := p.cache.Get(cacheKey); found {
|
||||
if plant, ok := cached.(domain.Plant); ok {
|
||||
slog.DebugContext(ctx, "Cache hit for GetPlantByUUID", "key", cacheKey)
|
||||
return plant, nil
|
||||
}
|
||||
}
|
||||
slog.DebugContext(ctx, "Cache miss for GetPlantByUUID", "key", cacheKey)
|
||||
|
||||
query := `SELECT * FROM plants WHERE uuid = $1`
|
||||
plants, err := p.fetch(ctx, query, uuid)
|
||||
if err != nil || len(plants) == 0 {
|
||||
if err != nil {
|
||||
return domain.Plant{}, err
|
||||
}
|
||||
if len(plants) == 0 {
|
||||
return domain.Plant{}, domain.ErrNotFound
|
||||
}
|
||||
return plants[0], nil
|
||||
plant := plants[0]
|
||||
p.cache.Set(cacheKey, plant, cacheTTLStatic)
|
||||
return plant, nil
|
||||
}
|
||||
|
||||
func (p *postgresPlantRepository) GetByName(ctx context.Context, name string) (domain.Plant, error) {
|
||||
@ -61,8 +86,25 @@ func (p *postgresPlantRepository) GetByName(ctx context.Context, name string) (d
|
||||
}
|
||||
|
||||
func (p *postgresPlantRepository) GetAll(ctx context.Context) ([]domain.Plant, error) {
|
||||
if cached, found := p.cache.Get(cacheKeyPlantsAll); found {
|
||||
if plants, ok := cached.([]domain.Plant); ok {
|
||||
slog.DebugContext(ctx, "Cache hit for GetAllPlants", "key", cacheKeyPlantsAll)
|
||||
return plants, nil
|
||||
}
|
||||
}
|
||||
slog.DebugContext(ctx, "Cache miss for GetAllPlants", "key", cacheKeyPlantsAll)
|
||||
|
||||
query := `SELECT * FROM plants`
|
||||
return p.fetch(ctx, query)
|
||||
plants, err := p.fetch(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(plants) > 0 {
|
||||
p.cache.Set(cacheKeyPlantsAll, plants, cacheTTLStatic)
|
||||
}
|
||||
|
||||
return plants, nil
|
||||
}
|
||||
|
||||
func (p *postgresPlantRepository) Create(ctx context.Context, plant *domain.Plant) error {
|
||||
@ -74,7 +116,13 @@ func (p *postgresPlantRepository) Create(ctx context.Context, plant *domain.Plan
|
||||
}
|
||||
query := `INSERT INTO plants (uuid, name, light_profile_id, soil_condition_id, harvest_unit_id, created_at, updated_at)
|
||||
VALUES ($1, $2, $3, $4, $5, NOW(), NOW()) RETURNING created_at, updated_at`
|
||||
return p.conn.QueryRow(ctx, query, plant.UUID, plant.Name, plant.LightProfileID, plant.SoilConditionID, plant.HarvestUnitID).Scan(&plant.CreatedAt, &plant.UpdatedAt)
|
||||
err := p.conn.QueryRow(ctx, query, plant.UUID, plant.Name, plant.LightProfileID, plant.SoilConditionID, plant.HarvestUnitID).Scan(&plant.CreatedAt, &plant.UpdatedAt)
|
||||
|
||||
if err == nil {
|
||||
p.cache.Delete(cacheKeyPlantsAll)
|
||||
slog.DebugContext(ctx, "Cache invalidated", "key", cacheKeyPlantsAll)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *postgresPlantRepository) Update(ctx context.Context, plant *domain.Plant) error {
|
||||
@ -84,11 +132,21 @@ func (p *postgresPlantRepository) Update(ctx context.Context, plant *domain.Plan
|
||||
query := `UPDATE plants SET name = $2, light_profile_id = $3, soil_condition_id = $4,
|
||||
harvest_unit_id = $5, updated_at = NOW() WHERE uuid = $1`
|
||||
_, err := p.conn.Exec(ctx, query, plant.UUID, plant.Name, plant.LightProfileID, plant.SoilConditionID, plant.HarvestUnitID)
|
||||
if err == nil {
|
||||
p.cache.Delete(cacheKeyPlantsAll)
|
||||
p.cache.Delete(cacheKeyPlantPrefix + plant.UUID)
|
||||
slog.DebugContext(ctx, "Cache invalidated", "keys", []string{cacheKeyPlantsAll, cacheKeyPlantPrefix + plant.UUID})
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *postgresPlantRepository) Delete(ctx context.Context, uuid string) error {
|
||||
query := `DELETE FROM plants WHERE uuid = $1`
|
||||
_, err := p.conn.Exec(ctx, query, uuid)
|
||||
if err == nil {
|
||||
p.cache.Delete(cacheKeyPlantsAll)
|
||||
p.cache.Delete(cacheKeyPlantPrefix + uuid)
|
||||
slog.DebugContext(ctx, "Cache invalidated", "keys", []string{cacheKeyPlantsAll, cacheKeyPlantPrefix + uuid})
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user