ForFarm/backend/internal/event/eventbus.go

151 lines
2.9 KiB
Go

package event
import (
"context"
"encoding/json"
"log/slog"
"github.com/forfarm/backend/internal/domain"
amqp "github.com/rabbitmq/amqp091-go"
)
type RabbitMQEventBus struct {
conn *amqp.Connection
channel *amqp.Channel
logger *slog.Logger
}
func NewRabbitMQEventBus(url string, logger *slog.Logger) (*RabbitMQEventBus, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, err
}
ch, err := conn.Channel()
if err != nil {
conn.Close()
return nil, err
}
// Declare the exchange
err = ch.ExchangeDeclare(
"events", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
ch.Close()
conn.Close()
return nil, err
}
return &RabbitMQEventBus{
conn: conn,
channel: ch,
logger: logger,
}, nil
}
func (r *RabbitMQEventBus) Publish(ctx context.Context, event domain.Event) error {
data, err := json.Marshal(event)
if err != nil {
return err
}
return r.channel.PublishWithContext(
ctx,
"events", // exchange
"events."+event.Type, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: data,
DeliveryMode: amqp.Persistent,
MessageId: event.ID,
Timestamp: event.Timestamp,
},
)
}
func (r *RabbitMQEventBus) Subscribe(ctx context.Context, eventType string, handler func(domain.Event) error) error {
// Declare a queue for this consumer
q, err := r.channel.QueueDeclare(
"", // name (empty = auto-generated)
false, // durable
true, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
// Bind the queue to the exchange
err = r.channel.QueueBind(
q.Name, // queue name
"events."+eventType, // routing key
"events", // exchange
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
// Start consuming
msgs, err := r.channel.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return err
}
go func() {
for {
select {
case <-ctx.Done():
return
case msg, ok := <-msgs:
if !ok {
return
}
var event domain.Event
if err := json.Unmarshal(msg.Body, &event); err != nil {
r.logger.Error("Failed to unmarshal event", "error", err)
msg.Nack(false, false)
continue
}
if err := handler(event); err != nil {
r.logger.Error("Failed to handle event", "error", err)
msg.Nack(false, true) // requeue
} else {
msg.Ack(false)
}
}
}
}()
return nil
}
func (r *RabbitMQEventBus) Close() error {
if err := r.channel.Close(); err != nil {
return err
}
return r.conn.Close()
}