amocrm/internal/workers/data_updater/data_updater.go

68 lines
1.3 KiB
Go
Raw Normal View History

2024-04-11 12:45:01 +00:00
package data_updater
import (
"amocrm/internal/brokers"
"amocrm/internal/models"
2024-12-03 13:22:05 +00:00
"amocrm/pkg/timer"
2024-04-11 12:45:01 +00:00
"context"
"go.uber.org/zap"
"time"
2024-12-02 23:28:22 +00:00
"fmt"
2024-04-11 12:45:01 +00:00
)
type Deps struct {
Logger *zap.Logger
Producer *brokers.Producer
2024-04-11 12:45:01 +00:00
}
type DataUpdater struct {
logger *zap.Logger
producer *brokers.Producer
2024-04-11 12:45:01 +00:00
}
func NewDataUpdaterWC(deps Deps) *DataUpdater {
return &DataUpdater{
logger: deps.Logger,
producer: deps.Producer,
2024-04-11 12:45:01 +00:00
}
}
func (wc *DataUpdater) Start(ctx context.Context) {
2024-12-03 13:22:05 +00:00
nextStart := timer.CalculateTime(4)
2024-05-20 09:36:04 +00:00
ticker := time.NewTicker(time.Nanosecond * time.Duration(nextStart))
2024-04-20 19:02:13 +00:00
//ticker := time.NewTicker(10 * time.Second)
2024-04-11 12:45:01 +00:00
defer ticker.Stop()
for {
2024-12-02 23:28:22 +00:00
func () {
defer func() {
if v := recover();v!=nil{
fmt.Println("RECOVERING in DataUpdater",v)
}
} ()
select {
case <-ticker.C:
wc.processTasks(ctx)
ticker.Reset(time.Nanosecond * time.Duration(nextStart))
case <-ctx.Done():
return
}
}()
2024-04-11 12:45:01 +00:00
}
}
func (wc *DataUpdater) processTasks(ctx context.Context) {
err := wc.producer.ToKafkaUpdate(ctx, models.KafkaMessage{
Type: models.AllDataUpdate,
AccountID: "",
})
2024-04-11 15:08:54 +00:00
if err != nil {
wc.logger.Error("error send task in kafka ro update all data", zap.Error(err))
2024-04-11 15:08:54 +00:00
return
}
2024-04-12 14:51:26 +00:00
}
2024-05-06 20:35:08 +00:00
func (wc *DataUpdater) Stop(_ context.Context) error {
2024-04-11 12:45:01 +00:00
return nil
}