package data_updater import ( "amocrm/internal/brokers" "amocrm/internal/models" "context" "go.uber.org/zap" "time" ) type Deps struct { Logger *zap.Logger Producer *brokers.Producer } type DataUpdater struct { logger *zap.Logger producer *brokers.Producer } func NewDataUpdaterWC(deps Deps) *DataUpdater { return &DataUpdater{ logger: deps.Logger, producer: deps.Producer, } } func (wc *DataUpdater) Start(ctx context.Context) { nextStart := calculateTime() ticker := time.NewTicker(time.Second * time.Duration(nextStart)) //ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: wc.processTasks(ctx) nextStart = calculateTime() ticker.Reset(time.Second * time.Duration(nextStart)) case <-ctx.Done(): return } } } func (wc *DataUpdater) processTasks(ctx context.Context) { err := wc.producer.ToKafkaUpdate(ctx, models.KafkaMessage{ Type: models.AllDataUpdate, AccountID: "", }) if err != nil { wc.logger.Error("error send task in kafka ro update all data", zap.Error(err)) return } } func (wc *DataUpdater) Stop(ctx context.Context) error { return nil }