diff --git a/internal/workers/data_updater/data_updater.go b/internal/workers/data_updater/data_updater.go index 93ae13a..541ff2d 100644 --- a/internal/workers/data_updater/data_updater.go +++ b/internal/workers/data_updater/data_updater.go @@ -6,6 +6,7 @@ import ( "context" "go.uber.org/zap" "time" + "fmt" ) type Deps struct { @@ -32,14 +33,21 @@ func (wc *DataUpdater) Start(ctx context.Context) { defer ticker.Stop() for { - select { - case <-ticker.C: - wc.processTasks(ctx) - nextStart = calculateTime() - ticker.Reset(time.Nanosecond * time.Duration(nextStart)) - case <-ctx.Done(): - return - } + func () { + defer func() { + if v := recover();v!=nil{ + fmt.Println("RECOVERING in DataUpdater",v) + } + } () + select { + case <-ticker.C: + wc.processTasks(ctx) + nextStart = calculateTime() + ticker.Reset(time.Nanosecond * time.Duration(nextStart)) + case <-ctx.Done(): + return + } + }() } } diff --git a/internal/workers/post_deals_worker/deals_worker.go b/internal/workers/post_deals_worker/deals_worker.go index 4d014e0..bcfd85c 100644 --- a/internal/workers/post_deals_worker/deals_worker.go +++ b/internal/workers/post_deals_worker/deals_worker.go @@ -55,54 +55,60 @@ func (wc *PostDeals) Start(ctx context.Context) { } () for { - select { - case <-ticker.C: - fmt.Println("StartFetching") - wc.startFetching(ctx) + func () { + defer func() { + if v := recover();v != nil { + fmt.Println("RECOVERING in PostDeals",v) + } + } () + select { + case <-ticker.C: + fmt.Println("StartFetching") + wc.startFetching(ctx) - case <-ctx.Done(): - fmt.Println("Done") - return - } + case <-ctx.Done(): + fmt.Println("Done") + return + } + }() } } -func (wc *PostDeals) startFetching(ctx context.Context) { - results, err := wc.amoRepo.AmoRepo.GettingAmoUsersTrueResults(ctx) - fmt.Println("StartFetchingResults", results,err) - if err != nil { - wc.logger.Error("error fetching users answers true results, for sending data to amo", zap.Error(err)) - return - } - - mapDealReq := make(map[string][]models.DealReq) - mapTokenDomain := make(map[string]string) - - for _, result := range results { +func (wc *PostDeals) processResult(ctx context.Context, + result model.AmoUsersTrueResults, + mapDealReq *map[string][]models.DealReq, + mapTokenDomain *map[string]string, +) error { + defer func () { + if v := recover(); v != nil { + fmt.Println("RECOVERING in processResult", v) + } + } () fmt.Println("StartFetchingResult", result) userPrivileges, err := wc.amoRepo.AccountRepo.GetPrivilegesByAccountID(ctx, result.QuizAccountID) fmt.Println("StartFetchingUP", userPrivileges, err) if err != nil { wc.logger.Error("error getting user privileges", zap.Error(err)) - return + return err } if !utils.VerifyUserPrivileges(userPrivileges) { wc.logger.Info("User don't have active quizCnt or quizUnlim privileges, aborting") - continue + //TODO: acquire results only for accounts with active privileges + return nil } allAnswers, err := wc.amoRepo.AnswerRepo.GetAllAnswersByQuizID(ctx, result.Session) fmt.Println("StartFetchingAA", allAnswers, err) if err != nil { wc.logger.Error("error getting all user answers by result session", zap.Error(err)) - return + return err } userTags, err := wc.amoRepo.AmoRepo.GetUserTagsByID(ctx, result.AmoAccountID) fmt.Println("StartFetchingUT", userTags, err) if err != nil { wc.logger.Error("error getting user tags by ano account id", zap.Error(err)) - return + return err } // За один запрос можно передать не более 50 сделок. @@ -129,14 +135,14 @@ func (wc *PostDeals) startFetching(ctx context.Context) { fmt.Println("StartFetchingCF", leadFields, contactData, companyData, customerToCreate, err) if err != nil { wc.logger.Error("error construct fields", zap.Error(err)) - return + return err } currentFields, err := wc.amoRepo.AmoRepo.GetUserFieldsByID(ctx, result.AmoAccountID) fmt.Println("StartFetchingcuF", currentFields, err) if err != nil { wc.logger.Error("error getting current user fields from db", zap.Error(err)) - return + return err } utmFields := tools.ConstructUTMFields(result.UTMs, currentFields) @@ -145,13 +151,13 @@ func (wc *PostDeals) startFetching(ctx context.Context) { fmt.Println("StartFetchingcC", currentFields, err) if err != nil { wc.logger.Error("error sending requests for create customer", zap.Error(err)) - continue + return err } err = wc.redisRepo.CachingLeadFieldsToRedis(ctx, result.AnswerID, leadFields) if err != nil { wc.logger.Error("error saving leads fields in redis", zap.Error(err)) - return + return err } deal.Embed.Contact = contactData @@ -160,18 +166,38 @@ func (wc *PostDeals) startFetching(ctx context.Context) { wc.logger.Info("NOW DEAL CONSTRUCTED IS:", zap.Any("DEAL", deal)) - if len(mapDealReq[result.AccessToken]) >= 49 { + if len((*mapDealReq)[result.AccessToken]) >= 49 { wc.logger.Info("reached maximum number of deals for access token", zap.String("access_token", result.AccessToken)) - err = wc.sendingDealsReq(ctx, mapDealReq, mapTokenDomain) + err = wc.sendingDealsReq(ctx, *mapDealReq, *mapTokenDomain) if err != nil { wc.logger.Error("error sending requests for create deals", zap.Error(err)) - return + return err } - mapDealReq = make(map[string][]models.DealReq) + mapDealReqTemp := make(map[string][]models.DealReq) + mapDealReq = &mapDealReqTemp } - mapDealReq[result.AccessToken] = append(mapDealReq[result.AccessToken], deal) - mapTokenDomain[result.AccessToken] = result.SubDomain + (*mapDealReq)[result.AccessToken] = append((*mapDealReq)[result.AccessToken], deal) + (*mapTokenDomain)[result.AccessToken] = result.SubDomain + + return nil +} + +func (wc *PostDeals) startFetching(ctx context.Context) { + results, err := wc.amoRepo.AmoRepo.GettingAmoUsersTrueResults(ctx) + fmt.Println("StartFetchingResults", results,err) + if err != nil { + wc.logger.Error("error fetching users answers true results, for sending data to amo", zap.Error(err)) + return + } + + mapDealReq := make(map[string][]models.DealReq) + mapTokenDomain := make(map[string]string) + + for _, result := range results { + if err := wc.processResult(ctx, result, &mapDealReq, &mapTokenDomain); err != nil { + wc.logger.Error("error processing result", zap.Error(err)) + } } err = wc.sendingDealsReq(ctx, mapDealReq, mapTokenDomain) diff --git a/internal/workers/post_fields_worker/fields_worker.go b/internal/workers/post_fields_worker/fields_worker.go index e391f0a..cadca31 100644 --- a/internal/workers/post_fields_worker/fields_worker.go +++ b/internal/workers/post_fields_worker/fields_worker.go @@ -40,14 +40,51 @@ func (wc *PostFields) Start(ctx context.Context) { defer ticker.Stop() for { - select { - case <-ticker.C: - wc.processTask(ctx) + func () { + defer func () { + if v := recover(); v!=nil{ + fmt.Println("RECOVERING in PostFields",v) + } + } () + select { + case <-ticker.C: + wc.processTask(ctx) - case <-ctx.Done(): - return + case <-ctx.Done(): + return + } + } () + } +} + +func (wc *PostFields) processDeal(ctx context.Context, token string, dealsData []models.MappingDealsData, forRestoringMap map[int32]models.ForRestoringData) error { + defer func () { + if v:=recover(); v!= nil { + fmt.Println("RECOVERING inprocessDeal", v) + } + } () + + errorCheckerMap, err := wc.sendForUpdate(ctx, token, dealsData) + if err != nil { + wc.logger.Error("error updating deals fields in db", zap.Error(err)) + return err + } + + for dealID, _ := range errorCheckerMap { + restoringData := forRestoringMap[dealID] + err = wc.redisRepo.CachingDealToRedis(ctx, restoringData.SaveDeal) + if err != nil { + wc.logger.Error("error restoring deal in redis", zap.Error(err)) + return err + } + + err = wc.redisRepo.CachingLeadFieldsToRedis(ctx, restoringData.SaveDeal.AnswerID, restoringData.LeadFields) + if err != nil { + wc.logger.Error("error restoring deal fields in redis", zap.Error(err)) + return err } } + return nil } func (wc *PostFields) processTask(ctx context.Context) { @@ -57,24 +94,8 @@ func (wc *PostFields) processTask(ctx context.Context) { return } for token, dealsData := range dealsDataForUpdate { - errorCheckerMap, err := wc.sendForUpdate(ctx, token, dealsData) - if err != nil { - wc.logger.Error("error updating deals fields in db", zap.Error(err)) - } - - for dealID, _ := range errorCheckerMap { - restoringData := forRestoringMap[dealID] - err = wc.redisRepo.CachingDealToRedis(ctx, restoringData.SaveDeal) - if err != nil { - wc.logger.Error("error restoring deal in redis", zap.Error(err)) - return - } - - err = wc.redisRepo.CachingLeadFieldsToRedis(ctx, restoringData.SaveDeal.AnswerID, restoringData.LeadFields) - if err != nil { - wc.logger.Error("error restoring deal fields in redis", zap.Error(err)) - return - } + if err := wc.processDeal(ctx, token, dealsData, forRestoringMap); err != nil { + wc.logger.Error("error processDeal", zap.Error(err)) } } }