bitrix/internal/workers/data_updater/data_updater.go

60 lines
1.2 KiB
Go
Raw Normal View History

2024-09-30 07:51:24 +00:00
package data_updater
import (
"context"
"go.uber.org/zap"
"penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/brokers"
"penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/models"
"time"
)
type Deps struct {
Logger *zap.Logger
Producer *brokers.Producer
}
type DataUpdater struct {
logger *zap.Logger
producer *brokers.Producer
}
func NewDataUpdaterWC(deps Deps) *DataUpdater {
return &DataUpdater{
logger: deps.Logger,
producer: deps.Producer,
}
}
func (wc *DataUpdater) Start(ctx context.Context) {
nextStart := calculateTime()
ticker := time.NewTicker(time.Nanosecond * time.Duration(nextStart))
//ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
wc.processTasks(ctx)
nextStart = calculateTime()
ticker.Reset(time.Nanosecond * time.Duration(nextStart))
case <-ctx.Done():
return
}
}
}
func (wc *DataUpdater) processTasks(ctx context.Context) {
err := wc.producer.ToKafkaUpdate(ctx, models.KafkaMessage{
Type: models.AllDataUpdate,
AccountID: "",
})
if err != nil {
wc.logger.Error("error send task in kafka ro update all data", zap.Error(err))
return
}
}
func (wc *DataUpdater) Stop(_ context.Context) error {
return nil
}