amocrm/internal/workers/data_updater/data_updater.go
2024-05-20 12:26:21 +03:00

63 lines
1.2 KiB
Go

package data_updater
import (
"amocrm/internal/brokers"
"amocrm/internal/models"
"context"
"go.uber.org/zap"
"time"
)
type Deps struct {
Logger *zap.Logger
Producer *brokers.Producer
}
type DataUpdater struct {
logger *zap.Logger
producer *brokers.Producer
}
func NewDataUpdaterWC(deps Deps) *DataUpdater {
return &DataUpdater{
logger: deps.Logger,
producer: deps.Producer,
}
}
func (wc *DataUpdater) Start(ctx context.Context) {
nextStart := calculateTime()
ticker := time.NewTicker(time.Second * time.Duration(nextStart))
//ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
wc.processTasks(ctx)
nextStart = calculateTime()
if nextStart <= 0 {
nextStart = 86400
}
ticker.Reset(time.Second * time.Duration(nextStart))
case <-ctx.Done():
return
}
}
}
func (wc *DataUpdater) processTasks(ctx context.Context) {
err := wc.producer.ToKafkaUpdate(ctx, models.KafkaMessage{
Type: models.AllDataUpdate,
AccountID: "",
})
if err != nil {
wc.logger.Error("error send task in kafka ro update all data", zap.Error(err))
return
}
}
func (wc *DataUpdater) Stop(_ context.Context) error {
return nil
}