mirror of
https://github.com/ForFarmTeam/ForFarm.git
synced 2025-12-19 14:04:08 +01:00
151 lines
2.9 KiB
Go
151 lines
2.9 KiB
Go
package event
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"log/slog"
|
|
|
|
"github.com/forfarm/backend/internal/domain"
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
)
|
|
|
|
type RabbitMQEventBus struct {
|
|
conn *amqp.Connection
|
|
channel *amqp.Channel
|
|
logger *slog.Logger
|
|
}
|
|
|
|
func NewRabbitMQEventBus(url string, logger *slog.Logger) (*RabbitMQEventBus, error) {
|
|
conn, err := amqp.Dial(url)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ch, err := conn.Channel()
|
|
if err != nil {
|
|
conn.Close()
|
|
return nil, err
|
|
}
|
|
|
|
// Declare the exchange
|
|
err = ch.ExchangeDeclare(
|
|
"events", // name
|
|
"topic", // type
|
|
true, // durable
|
|
false, // auto-deleted
|
|
false, // internal
|
|
false, // no-wait
|
|
nil, // arguments
|
|
)
|
|
if err != nil {
|
|
ch.Close()
|
|
conn.Close()
|
|
return nil, err
|
|
}
|
|
|
|
return &RabbitMQEventBus{
|
|
conn: conn,
|
|
channel: ch,
|
|
logger: logger,
|
|
}, nil
|
|
}
|
|
|
|
func (r *RabbitMQEventBus) Publish(ctx context.Context, event domain.Event) error {
|
|
data, err := json.Marshal(event)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return r.channel.PublishWithContext(
|
|
ctx,
|
|
"events", // exchange
|
|
"events."+event.Type, // routing key
|
|
false, // mandatory
|
|
false, // immediate
|
|
amqp.Publishing{
|
|
ContentType: "application/json",
|
|
Body: data,
|
|
DeliveryMode: amqp.Persistent,
|
|
MessageId: event.ID,
|
|
Timestamp: event.Timestamp,
|
|
},
|
|
)
|
|
}
|
|
|
|
func (r *RabbitMQEventBus) Subscribe(ctx context.Context, eventType string, handler func(domain.Event) error) error {
|
|
// Declare a queue for this consumer
|
|
q, err := r.channel.QueueDeclare(
|
|
"", // name (empty = auto-generated)
|
|
false, // durable
|
|
true, // delete when unused
|
|
true, // exclusive
|
|
false, // no-wait
|
|
nil, // arguments
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Bind the queue to the exchange
|
|
err = r.channel.QueueBind(
|
|
q.Name, // queue name
|
|
"events."+eventType, // routing key
|
|
"events", // exchange
|
|
false, // no-wait
|
|
nil, // arguments
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Start consuming
|
|
msgs, err := r.channel.Consume(
|
|
q.Name, // queue
|
|
"", // consumer
|
|
false, // auto-ack
|
|
false, // exclusive
|
|
false, // no-local
|
|
false, // no-wait
|
|
nil, // args
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case msg, ok := <-msgs:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
var event domain.Event
|
|
if err := json.Unmarshal(msg.Body, &event); err != nil {
|
|
r.logger.Error("Failed to unmarshal event", "error", err)
|
|
msg.Nack(false, false)
|
|
continue
|
|
}
|
|
|
|
if err := handler(event); err != nil {
|
|
r.logger.Error("Failed to handle event", "error", err)
|
|
msg.Nack(false, true) // requeue
|
|
} else {
|
|
msg.Ack(false)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *RabbitMQEventBus) Close() error {
|
|
if err := r.channel.Close(); err != nil {
|
|
return err
|
|
}
|
|
return r.conn.Close()
|
|
}
|