diff --git a/internal/app/app.go b/internal/app/app.go index 2a236ca..22c8fc2 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -63,6 +63,7 @@ func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) erro MdbUser: mdb.Collection("amoUsers"), Tokens: mdb.Collection("tokens"), Pipelines: mdb.Collection("pipelines"), + Steps: mdb.Collection("steps"), Logger: logger, }) diff --git a/internal/models/step.go b/internal/models/step.go index b29b87e..2090a4b 100644 --- a/internal/models/step.go +++ b/internal/models/step.go @@ -2,19 +2,21 @@ package models type Step struct { /* - айдишник воронки в амо*/ - Pipelineid int `json:"PipelineID"` + Pipelineid int `json:"PipelineID" bson:"pipelineid"` /* - связь с аккаунтом в интеграции амо*/ - Accountid string `json:"AccountID"` + Accountid int `json:"AccountID" bson:"accountid"` /* - айдишник шага воронки в амо*/ - Amoid int `json:"AmoID"` + Amoid int `json:"AmoID" bson:"amoid"` /* - цвет шага в амо*/ - Color string `json:"Color"` + Color string `json:"Color" bson:"color"` /* - таймштамп создания воронки в нашей системе*/ - Createdat int `json:"CreatedAt"` + Createdat int64 `json:"CreatedAt" bson:"createdat"` + // время обновления + UpdateAt int64 `json:"UpdateAt" bson:"updateAt"` /* - флаг мягкого удаления*/ - Deleted bool `json:"Deleted"` + Deleted bool `json:"Deleted" bson:"deleted"` /* - айдишник в нашей системе*/ - ID int `json:"ID"` + ID string `json:"ID" bson:"ID"` /* - название воронки в амо*/ - Name string `json:"Name"` + Name string `json:"Name" bson:"name"` } diff --git a/internal/repository/repository.go b/internal/repository/initial.go similarity index 86% rename from internal/repository/repository.go rename to internal/repository/initial.go index d13b91f..0131e44 100644 --- a/internal/repository/repository.go +++ b/internal/repository/initial.go @@ -9,6 +9,7 @@ type Deps struct { MdbUser *mongo.Collection Tokens *mongo.Collection Pipelines *mongo.Collection + Steps *mongo.Collection Logger *zap.Logger } @@ -16,6 +17,7 @@ type Repository struct { mdbUser *mongo.Collection tokens *mongo.Collection pipelines *mongo.Collection + steps *mongo.Collection logger *zap.Logger } @@ -24,6 +26,7 @@ func NewRepository(deps Deps) *Repository { mdbUser: deps.MdbUser, tokens: deps.Tokens, pipelines: deps.Pipelines, + steps: deps.Steps, logger: deps.Logger, } } diff --git a/internal/repository/steps.go b/internal/repository/steps.go index ca81549..52d5745 100644 --- a/internal/repository/steps.go +++ b/internal/repository/steps.go @@ -2,7 +2,11 @@ package repository import ( "amocrm/internal/models" + "amocrm/internal/models/amo" "context" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "time" ) func (r *Repository) GettingStepsFromCash(ctx context.Context) (*models.UserListStepsResp, error) { @@ -17,3 +21,69 @@ func (r *Repository) UpdateListSteps(ctx context.Context) error { return nil } + +func (r *Repository) CheckSteps(ctx context.Context, accountID string, steps []amo.Statuses) error { + for _, s := range steps { + step := models.Step{ + ID: accountID, + Pipelineid: s.PipelineID, + Name: s.Name, + Accountid: s.AccountID, + Amoid: s.ID, + Color: s.Color, + } + + existingStep, err := r.GetStepByID(ctx, accountID, s.ID) + if err != nil { + return err + } + + if existingStep != nil { + step.UpdateAt = time.Now().Unix() + err := r.UpdateStep(ctx, &step) + if err != nil { + return err + } + } else { + step.Createdat = time.Now().Unix() + err := r.InsertStep(ctx, &step) + if err != nil { + return err + } + } + } + + return nil +} + +func (r *Repository) GetStepByID(ctx context.Context, accountID string, amoid int) (*models.Step, error) { + var step models.Step + filter := bson.M{"id": accountID, "amoid": amoid} + err := r.steps.FindOne(ctx, filter).Decode(&step) + if err == mongo.ErrNoDocuments { + return nil, nil + } + if err != nil { + return nil, err + } + return &step, nil +} + +func (r *Repository) UpdateStep(ctx context.Context, step *models.Step) error { + filter := bson.M{"id": step.ID, "amoid": step.Amoid} + update := bson.M{"$set": bson.M{ + "accountid": step.Accountid, + "name": step.Name, + "updateAt": step.UpdateAt, + "deleted": step.Deleted, + "color": step.Color, + "pipelineid": step.Pipelineid, + }} + _, err := r.steps.UpdateOne(ctx, filter, update) + return err +} + +func (r *Repository) InsertStep(ctx context.Context, step *models.Step) error { + _, err := r.steps.InsertOne(ctx, step) + return err +} diff --git a/internal/workers/data_updater/data_updater.go b/internal/workers/data_updater/data_updater.go index 0e0bb62..0181cec 100644 --- a/internal/workers/data_updater/data_updater.go +++ b/internal/workers/data_updater/data_updater.go @@ -61,6 +61,19 @@ func (wc *DataUpdater) processTasks(ctx context.Context) { if err != nil { wc.logger.Error("error update pipelines in mongo:", zap.Error(err)) } + + for _, pipeline := range pipelines.Embedded.Pipelines { + steps, err := wc.amoClient.GetListSteps(pipeline.ID, token.AccessToken) + if err != nil { + wc.logger.Error("error getting list steps pipeline:", zap.Error(err)) + continue + } + + err = wc.repo.CheckSteps(ctx, token.AccountID, steps.Embedded.Statuses) + if err != nil { + wc.logger.Error("error update pipeline steps in mongo:", zap.Error(err)) + } + } } } diff --git a/openapi.yaml b/openapi.yaml index 0542477..4d20910 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -511,7 +511,7 @@ components: description: объект шага воронки амо properties: ID: - type: integer + type: string description: айдишник в нашей системе AmoID: type: integer diff --git a/pkg/amoClient/amo.go b/pkg/amoClient/amo.go index fc039f2..9afd78d 100644 --- a/pkg/amoClient/amo.go +++ b/pkg/amoClient/amo.go @@ -124,7 +124,7 @@ func (a *Amo) CreateWebHook(req amo2.WebHookRequest) (*amo2.CreateWebHookResp, e } if statusCode != fiber.StatusOK { - errorMessage := fmt.Sprintf("received an incorrect response from CreateWebHook: %d", statusCode) + errorMessage := fmt.Sprintf("received an incorrect response from CreateWebHook: %s", string(resBody)) a.logger.Error(errorMessage, zap.Int("status", statusCode)) return nil, fmt.Errorf(errorMessage) } @@ -155,12 +155,12 @@ func (a *Amo) DeleteWebHook() { // https://www.amocrm.ru/developers/content/crm_platform/leads_pipelines#%D0%A1%D0%BF%D0%B8%D1%81%D0%BE%D0%BA-%D1%81%D1%82%D0%B0%D1%82%D1%83%D1%81%D0%BE%D0%B2-%D0%B2%D0%BE%D1%80%D0%BE%D0%BD%D0%BA%D0%B8-%D1%81%D0%B4%D0%B5%D0%BB%D0%BE%D0%BA // GET /api/v4/leads/pipelines/{pipeline_id}/statuses -func (a *Amo) GetListSteps(pipelineID int) (*amo2.ResponseGetListSteps, error) { +func (a *Amo) GetListSteps(pipelineID int, accessToken string) (*amo2.ResponseGetListSteps, error) { for { if a.rateLimiter.Check() { uri := fmt.Sprintf("%s/api/v4/leads/pipelines/%d/statuses", a.baseApiURL, pipelineID) - agent := a.fiberClient.Get(uri) + agent.Set("Authorization", "Bearer "+accessToken) statusCode, resBody, errs := agent.Bytes() if len(errs) > 0 { for _, err := range errs { @@ -170,7 +170,7 @@ func (a *Amo) GetListSteps(pipelineID int) (*amo2.ResponseGetListSteps, error) { } if statusCode != fiber.StatusOK { - errorMessage := fmt.Sprintf("received an incorrect response from GetListSteps: %d", statusCode) + errorMessage := fmt.Sprintf("received an incorrect response from GetListSteps: %s", string(resBody)) a.logger.Error(errorMessage, zap.Int("status", statusCode)) return nil, fmt.Errorf(errorMessage) } @@ -211,7 +211,7 @@ func (a *Amo) GetListFields(req amo2.GetListFieldsReq, url string) (*amo2.Respon } if statusCode != fiber.StatusOK { - errorMessage := fmt.Sprintf("received an incorrect response from GetListFields: %d", statusCode) + errorMessage := fmt.Sprintf("received an incorrect response from GetListFields: %s", string(resBody)) a.logger.Error(errorMessage, zap.Int("status", statusCode)) return nil, fmt.Errorf(errorMessage) } @@ -260,7 +260,7 @@ func (a *Amo) GetListTags(req amo2.GetListTagsReq) (*amo2.ResponseGetListTags, e } if statusCode != fiber.StatusOK { - errorMessage := fmt.Sprintf("received an incorrect response from GetListTags: %d", statusCode) + errorMessage := fmt.Sprintf("received an incorrect response from GetListTags: %s", string(resBody)) a.logger.Error(errorMessage, zap.Int("status", statusCode)) return nil, fmt.Errorf(errorMessage) } @@ -292,7 +292,7 @@ func (a *Amo) GetUserInfo(accessToken string) (*amo2.AmocrmUserInformation, erro } if statusCode != fiber.StatusOK { - errorMessage := fmt.Sprintf("received an incorrect response from GetUserInfo: %d", statusCode) + errorMessage := fmt.Sprintf("received an incorrect response from GetUserInfo: %s", string(resBody)) a.logger.Error(errorMessage, zap.Int("status", statusCode)) return nil, fmt.Errorf(errorMessage) } @@ -325,7 +325,7 @@ func (a *Amo) GetUserByID(accessToken string, id int64) (*amo2.OneUserInfo, erro } if statusCode != fiber.StatusOK { - errorMessage := fmt.Sprintf("received an incorrect response from GetUserByID: %d", statusCode) + errorMessage := fmt.Sprintf("received an incorrect response from GetUserByID:%s", string(resBody)) a.logger.Error(errorMessage, zap.Int("status", statusCode)) return nil, fmt.Errorf(errorMessage) } @@ -358,7 +358,7 @@ func (a *Amo) GetListPipelines(accessToken string) (*amo2.PipelineResponse, erro } if statusCode != fiber.StatusOK { - errorMessage := fmt.Sprintf("received an incorrect response from GetListPipelines: %d", statusCode) + errorMessage := fmt.Sprintf("received an incorrect response from GetListPipelines: %s", string(resBody)) a.logger.Error(errorMessage, zap.Int("status", statusCode)) return nil, fmt.Errorf(errorMessage) }