129 lines
2.8 KiB
Go
129 lines
2.8 KiB
Go
package nats
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
|
|
"github.com/chaitin/panda-wiki/config"
|
|
"github.com/chaitin/panda-wiki/log"
|
|
)
|
|
|
|
type MQProducer struct {
|
|
conn *nats.Conn
|
|
js nats.JetStreamContext
|
|
logger *log.Logger
|
|
}
|
|
|
|
func (p *MQProducer) EnsureStreams() error {
|
|
streams := []struct {
|
|
name string
|
|
subjects []string
|
|
}{
|
|
{
|
|
name: "task",
|
|
subjects: []string{"apps.panda-wiki.summary.task", "apps.panda-wiki.vector.task"},
|
|
},
|
|
{
|
|
name: "scraper",
|
|
subjects: []string{"apps.panda-wiki.scraper.>"},
|
|
},
|
|
}
|
|
|
|
for _, stream := range streams {
|
|
_, err := p.js.StreamInfo(stream.name)
|
|
if err == nil {
|
|
p.logger.Debug("stream already exists",
|
|
log.String("stream", stream.name))
|
|
continue
|
|
}
|
|
|
|
// Stream doesn't exist, create it
|
|
_, err = p.js.AddStream(&nats.StreamConfig{
|
|
Name: stream.name,
|
|
Subjects: stream.subjects,
|
|
Storage: nats.FileStorage,
|
|
Retention: nats.LimitsPolicy,
|
|
Discard: nats.DiscardOld,
|
|
MaxAge: 7 * 24 * time.Hour,
|
|
MaxBytes: 1 * 1024 * 1024 * 1024,
|
|
MaxMsgs: 1000000,
|
|
MaxMsgSize: 50 * 1024 * 1024,
|
|
Replicas: 1,
|
|
Duplicates: 120 * time.Second,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create stream %s: %w", stream.name, err)
|
|
}
|
|
|
|
p.logger.Info("created stream",
|
|
log.String("stream", stream.name),
|
|
log.Any("subjects", stream.subjects))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func NewMQProducer(config *config.Config, logger *log.Logger) (*MQProducer, error) {
|
|
opts := []nats.Option{
|
|
nats.Name("panda-wiki"),
|
|
}
|
|
|
|
if user := config.MQ.NATS.User; user != "" {
|
|
opts = append(opts, nats.UserInfo(user, config.MQ.NATS.Password))
|
|
}
|
|
|
|
server := config.MQ.NATS.Server
|
|
|
|
conn, err := nats.Connect(server, opts...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
|
|
}
|
|
|
|
js, err := conn.JetStream()
|
|
if err != nil {
|
|
conn.Close()
|
|
return nil, fmt.Errorf("failed to get JetStream context: %w", err)
|
|
}
|
|
|
|
producer := &MQProducer{
|
|
conn: conn,
|
|
js: js,
|
|
logger: logger,
|
|
}
|
|
|
|
// Ensure streams exist
|
|
if err := producer.EnsureStreams(); err != nil {
|
|
conn.Close()
|
|
return nil, fmt.Errorf("failed to ensure streams: %w", err)
|
|
}
|
|
|
|
return producer, nil
|
|
}
|
|
|
|
func (p *MQProducer) Produce(ctx context.Context, topic string, key string, value []byte) error {
|
|
p.logger.Debug("publishing message",
|
|
log.String("topic", topic),
|
|
log.String("key", key),
|
|
log.Int("value_size", len(value)))
|
|
|
|
_, err := p.js.Publish(topic, value)
|
|
if err != nil {
|
|
p.logger.Error("failed to publish message",
|
|
log.String("topic", topic),
|
|
log.Error(err))
|
|
return fmt.Errorf("failed to publish message: %w", err)
|
|
}
|
|
|
|
p.logger.Debug("message published successfully",
|
|
log.String("topic", topic))
|
|
return nil
|
|
}
|
|
|
|
func (p *MQProducer) Close() error {
|
|
p.conn.Close()
|
|
return nil
|
|
}
|