add new welcome worker

This commit is contained in:
Pavel 2024-04-02 17:46:05 +03:00
parent 02bb326055
commit b0992638ad
5 changed files with 114 additions and 22 deletions

@ -5,11 +5,15 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"mailnotifier/internal/clients" "mailnotifier/internal/clients"
"mailnotifier/internal/initialize" "mailnotifier/internal/initialize"
"mailnotifier/internal/models"
"mailnotifier/internal/repository" "mailnotifier/internal/repository"
"mailnotifier/internal/workers" "mailnotifier/internal/workers"
) )
func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) error { func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) error {
welcomeChan := make(chan models.Message, 50)
mdb, err := initialize.MongoInit(ctx, config) mdb, err := initialize.MongoInit(ctx, config)
if err != nil { if err != nil {
logger.Error("Failed to initialize MongoDB", zap.Error(err)) logger.Error("Failed to initialize MongoDB", zap.Error(err))
@ -18,7 +22,7 @@ func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) erro
kafka, err := initialize.KafkaConsumerInit(ctx, config) kafka, err := initialize.KafkaConsumerInit(ctx, config)
repo := repository.NewRepository(mdb.Collection("notify")) repo := repository.NewRepository(mdb.Collection("notify"), welcomeChan)
mailClient := clients.NewMailClient(clients.Deps{ mailClient := clients.NewMailClient(clients.Deps{
SmtpHost: config.SmtpHost, SmtpHost: config.SmtpHost,
@ -43,7 +47,15 @@ func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) erro
Logger: logger, Logger: logger,
}) })
welcomer := workers.NewWelcomer(workers.WelcomerDeps{
Repo: repo,
MailClient: mailClient,
Logger: logger,
WelcomeChan: welcomeChan,
})
go consumer.Start(ctx) go consumer.Start(ctx)
go welcomer.Start(ctx)
go notifyer.Start(ctx) go notifyer.Start(ctx)
//todo gracefull showtdown wc //todo gracefull showtdown wc

@ -6,11 +6,14 @@ import (
) )
type Message struct { type Message struct {
ID primitive.ObjectID `bson:"_id"` ID primitive.ObjectID `bson:"_id"`
AccountID string `bson:"accountID"` // id аккаунта который что то сделал AccountID string `bson:"accountID"` // id аккаунта который что то сделал
Email string `bson:"email"` // его mail Email string `bson:"email"` // его mail
ServiceKey string `bson:"serviceKey"` // сервис с которого пришло сообщение ServiceKey string `bson:"serviceKey"` // сервис с которого пришло сообщение
EventType string `bson:"eventType"` // тип события вызвавшее отправку данных в кафку EventType string `bson:"eventType"` // тип события вызвавшее отправку данных в кафку
Send bool `bson:"send"` SendRegistration bool `bson:"sendRegistration"`
SendAt time.Time `bson:"sendAt"` SendNoneCreated bool `bson:"sendNoneCreated"`
SendUnpublished bool `bson:"sendUnpublished"`
SendPaid bool `bson:"sendPaid"`
SendAt time.Time `bson:"sendAt"` // или в unix?
} }

@ -9,24 +9,27 @@ import (
) )
type Repository struct { type Repository struct {
mdb *mongo.Collection mdb *mongo.Collection
welcomeChan chan models.Message
} }
func NewRepository(mdb *mongo.Collection) *Repository { func NewRepository(mdb *mongo.Collection, welcomeChan chan models.Message) *Repository {
return &Repository{ return &Repository{
mdb: mdb, mdb: mdb,
welcomeChan: welcomeChan,
} }
} }
// записываем каждый месседж по одному // записываем каждый месседж по одному
func (r *Repository) Insert(ctx context.Context, mes models.Message) error { func (r *Repository) Insert(ctx context.Context, mes models.Message) error {
mes.ID = primitive.NewObjectID() mes.ID = primitive.NewObjectID()
mes.Send = false
_, err := r.mdb.InsertOne(ctx, mes) _, err := r.mdb.InsertOne(ctx, mes)
if err != nil { if err != nil {
return err return err
} }
r.welcomeChan <- mes
return nil return nil
} }
@ -45,3 +48,23 @@ func (r *Repository) GetMany(ctx context.Context) ([]models.Message, error) {
return messages, nil return messages, nil
} }
// апдейтим то что отправили на почту
func (r *Repository) Update(ctx context.Context, message models.Message) error {
filter := bson.M{"_id": message.ID}
update := bson.M{
"$set": bson.M{
"sendRegistration": message.SendRegistration,
"sendNoneCreated": message.SendNoneCreated,
"sendUnpublished": message.SendUnpublished,
"sendPaid": message.SendPaid,
},
}
_, err := r.mdb.UpdateOne(ctx, filter, update)
if err != nil {
return err
}
return nil
}

@ -2,6 +2,7 @@ package workers
import ( import (
"context" "context"
"fmt"
"go.uber.org/zap" "go.uber.org/zap"
"mailnotifier/internal/clients" "mailnotifier/internal/clients"
"mailnotifier/internal/repository" "mailnotifier/internal/repository"
@ -50,15 +51,6 @@ func (n *Notifyer) notify(ctx context.Context) {
n.logger.Error("error getting records from mongo", zap.Error(err)) n.logger.Error("error getting records from mongo", zap.Error(err))
} }
for _, record := range records { for _, record := range records {
//тут тулзы которые распределяют записи по их принадлежности к определенному ивенту fmt.Println(record)
err = n.mailClient.MailSender(clients.SenderDeps{
Subject: "из тулзов",
Email: record.Email,
TmplPath: "из тулзов",
})
if err != nil {
n.logger.Error("error sending message to mailbox")
}
} }
} }

@ -0,0 +1,62 @@
package workers
import (
"context"
"go.uber.org/zap"
"mailnotifier/internal/clients"
"mailnotifier/internal/models"
"mailnotifier/internal/repository"
)
//todo тут будут заембенжены шаблоны
type Welcomer struct {
repo *repository.Repository
mailClient *clients.MailClient
logger *zap.Logger
welcomeChan <-chan models.Message
}
type WelcomerDeps struct {
Repo *repository.Repository
MailClient *clients.MailClient
Logger *zap.Logger
WelcomeChan chan models.Message
}
func NewWelcomer(deps WelcomerDeps) *Welcomer {
return &Welcomer{
repo: deps.Repo,
mailClient: deps.MailClient,
logger: deps.Logger,
welcomeChan: deps.WelcomeChan,
}
}
func (w *Welcomer) Start(ctx context.Context) {
for {
select {
case msg := <-w.welcomeChan:
w.welcome(ctx, msg)
case <-ctx.Done():
return
}
}
}
func (w *Welcomer) welcome(ctx context.Context, msg models.Message) {
err := w.mailClient.MailSender(clients.SenderDeps{
Subject: "registration",
Email: msg.Email,
TmplPath: "registration", // todo заменить на шаблон
})
if err != nil {
w.logger.Error("error sending message to mailbox", zap.Error(err))
}
msg.SendRegistration = true
err = w.repo.Update(ctx, msg)
if err != nil {
w.logger.Error("error updating record for registration", zap.Error(err))
}
}