more recovering

This commit is contained in:
skeris 2024-12-03 02:28:22 +03:00
parent df757e097b
commit 0e13df1f83
3 changed files with 120 additions and 65 deletions

@ -6,6 +6,7 @@ import (
"context"
"go.uber.org/zap"
"time"
"fmt"
)
type Deps struct {
@ -32,14 +33,21 @@ func (wc *DataUpdater) Start(ctx context.Context) {
defer ticker.Stop()
for {
select {
case <-ticker.C:
wc.processTasks(ctx)
nextStart = calculateTime()
ticker.Reset(time.Nanosecond * time.Duration(nextStart))
case <-ctx.Done():
return
}
func () {
defer func() {
if v := recover();v!=nil{
fmt.Println("RECOVERING in DataUpdater",v)
}
} ()
select {
case <-ticker.C:
wc.processTasks(ctx)
nextStart = calculateTime()
ticker.Reset(time.Nanosecond * time.Duration(nextStart))
case <-ctx.Done():
return
}
}()
}
}

@ -55,54 +55,60 @@ func (wc *PostDeals) Start(ctx context.Context) {
} ()
for {
select {
case <-ticker.C:
fmt.Println("StartFetching")
wc.startFetching(ctx)
func () {
defer func() {
if v := recover();v != nil {
fmt.Println("RECOVERING in PostDeals",v)
}
} ()
select {
case <-ticker.C:
fmt.Println("StartFetching")
wc.startFetching(ctx)
case <-ctx.Done():
fmt.Println("Done")
return
}
case <-ctx.Done():
fmt.Println("Done")
return
}
}()
}
}
func (wc *PostDeals) startFetching(ctx context.Context) {
results, err := wc.amoRepo.AmoRepo.GettingAmoUsersTrueResults(ctx)
fmt.Println("StartFetchingResults", results,err)
if err != nil {
wc.logger.Error("error fetching users answers true results, for sending data to amo", zap.Error(err))
return
}
mapDealReq := make(map[string][]models.DealReq)
mapTokenDomain := make(map[string]string)
for _, result := range results {
func (wc *PostDeals) processResult(ctx context.Context,
result model.AmoUsersTrueResults,
mapDealReq *map[string][]models.DealReq,
mapTokenDomain *map[string]string,
) error {
defer func () {
if v := recover(); v != nil {
fmt.Println("RECOVERING in processResult", v)
}
} ()
fmt.Println("StartFetchingResult", result)
userPrivileges, err := wc.amoRepo.AccountRepo.GetPrivilegesByAccountID(ctx, result.QuizAccountID)
fmt.Println("StartFetchingUP", userPrivileges, err)
if err != nil {
wc.logger.Error("error getting user privileges", zap.Error(err))
return
return err
}
if !utils.VerifyUserPrivileges(userPrivileges) {
wc.logger.Info("User don't have active quizCnt or quizUnlim privileges, aborting")
continue
//TODO: acquire results only for accounts with active privileges
return nil
}
allAnswers, err := wc.amoRepo.AnswerRepo.GetAllAnswersByQuizID(ctx, result.Session)
fmt.Println("StartFetchingAA", allAnswers, err)
if err != nil {
wc.logger.Error("error getting all user answers by result session", zap.Error(err))
return
return err
}
userTags, err := wc.amoRepo.AmoRepo.GetUserTagsByID(ctx, result.AmoAccountID)
fmt.Println("StartFetchingUT", userTags, err)
if err != nil {
wc.logger.Error("error getting user tags by ano account id", zap.Error(err))
return
return err
}
// За один запрос можно передать не более 50 сделок.
@ -129,14 +135,14 @@ func (wc *PostDeals) startFetching(ctx context.Context) {
fmt.Println("StartFetchingCF", leadFields, contactData, companyData, customerToCreate, err)
if err != nil {
wc.logger.Error("error construct fields", zap.Error(err))
return
return err
}
currentFields, err := wc.amoRepo.AmoRepo.GetUserFieldsByID(ctx, result.AmoAccountID)
fmt.Println("StartFetchingcuF", currentFields, err)
if err != nil {
wc.logger.Error("error getting current user fields from db", zap.Error(err))
return
return err
}
utmFields := tools.ConstructUTMFields(result.UTMs, currentFields)
@ -145,13 +151,13 @@ func (wc *PostDeals) startFetching(ctx context.Context) {
fmt.Println("StartFetchingcC", currentFields, err)
if err != nil {
wc.logger.Error("error sending requests for create customer", zap.Error(err))
continue
return err
}
err = wc.redisRepo.CachingLeadFieldsToRedis(ctx, result.AnswerID, leadFields)
if err != nil {
wc.logger.Error("error saving leads fields in redis", zap.Error(err))
return
return err
}
deal.Embed.Contact = contactData
@ -160,18 +166,38 @@ func (wc *PostDeals) startFetching(ctx context.Context) {
wc.logger.Info("NOW DEAL CONSTRUCTED IS:", zap.Any("DEAL", deal))
if len(mapDealReq[result.AccessToken]) >= 49 {
if len((*mapDealReq)[result.AccessToken]) >= 49 {
wc.logger.Info("reached maximum number of deals for access token", zap.String("access_token", result.AccessToken))
err = wc.sendingDealsReq(ctx, mapDealReq, mapTokenDomain)
err = wc.sendingDealsReq(ctx, *mapDealReq, *mapTokenDomain)
if err != nil {
wc.logger.Error("error sending requests for create deals", zap.Error(err))
return
return err
}
mapDealReq = make(map[string][]models.DealReq)
mapDealReqTemp := make(map[string][]models.DealReq)
mapDealReq = &mapDealReqTemp
}
mapDealReq[result.AccessToken] = append(mapDealReq[result.AccessToken], deal)
mapTokenDomain[result.AccessToken] = result.SubDomain
(*mapDealReq)[result.AccessToken] = append((*mapDealReq)[result.AccessToken], deal)
(*mapTokenDomain)[result.AccessToken] = result.SubDomain
return nil
}
func (wc *PostDeals) startFetching(ctx context.Context) {
results, err := wc.amoRepo.AmoRepo.GettingAmoUsersTrueResults(ctx)
fmt.Println("StartFetchingResults", results,err)
if err != nil {
wc.logger.Error("error fetching users answers true results, for sending data to amo", zap.Error(err))
return
}
mapDealReq := make(map[string][]models.DealReq)
mapTokenDomain := make(map[string]string)
for _, result := range results {
if err := wc.processResult(ctx, result, &mapDealReq, &mapTokenDomain); err != nil {
wc.logger.Error("error processing result", zap.Error(err))
}
}
err = wc.sendingDealsReq(ctx, mapDealReq, mapTokenDomain)

@ -40,14 +40,51 @@ func (wc *PostFields) Start(ctx context.Context) {
defer ticker.Stop()
for {
select {
case <-ticker.C:
wc.processTask(ctx)
func () {
defer func () {
if v := recover(); v!=nil{
fmt.Println("RECOVERING in PostFields",v)
}
} ()
select {
case <-ticker.C:
wc.processTask(ctx)
case <-ctx.Done():
return
case <-ctx.Done():
return
}
} ()
}
}
func (wc *PostFields) processDeal(ctx context.Context, token string, dealsData []models.MappingDealsData, forRestoringMap map[int32]models.ForRestoringData) error {
defer func () {
if v:=recover(); v!= nil {
fmt.Println("RECOVERING inprocessDeal", v)
}
} ()
errorCheckerMap, err := wc.sendForUpdate(ctx, token, dealsData)
if err != nil {
wc.logger.Error("error updating deals fields in db", zap.Error(err))
return err
}
for dealID, _ := range errorCheckerMap {
restoringData := forRestoringMap[dealID]
err = wc.redisRepo.CachingDealToRedis(ctx, restoringData.SaveDeal)
if err != nil {
wc.logger.Error("error restoring deal in redis", zap.Error(err))
return err
}
err = wc.redisRepo.CachingLeadFieldsToRedis(ctx, restoringData.SaveDeal.AnswerID, restoringData.LeadFields)
if err != nil {
wc.logger.Error("error restoring deal fields in redis", zap.Error(err))
return err
}
}
return nil
}
func (wc *PostFields) processTask(ctx context.Context) {
@ -57,24 +94,8 @@ func (wc *PostFields) processTask(ctx context.Context) {
return
}
for token, dealsData := range dealsDataForUpdate {
errorCheckerMap, err := wc.sendForUpdate(ctx, token, dealsData)
if err != nil {
wc.logger.Error("error updating deals fields in db", zap.Error(err))
}
for dealID, _ := range errorCheckerMap {
restoringData := forRestoringMap[dealID]
err = wc.redisRepo.CachingDealToRedis(ctx, restoringData.SaveDeal)
if err != nil {
wc.logger.Error("error restoring deal in redis", zap.Error(err))
return
}
err = wc.redisRepo.CachingLeadFieldsToRedis(ctx, restoringData.SaveDeal.AnswerID, restoringData.LeadFields)
if err != nil {
wc.logger.Error("error restoring deal fields in redis", zap.Error(err))
return
}
if err := wc.processDeal(ctx, token, dealsData, forRestoringMap); err != nil {
wc.logger.Error("error processDeal", zap.Error(err))
}
}
}