add method CheckSteps in DataUpdater
This commit is contained in:
parent
890b6ed91e
commit
14a740a13d
@ -63,6 +63,7 @@ func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) erro
|
|||||||
MdbUser: mdb.Collection("amoUsers"),
|
MdbUser: mdb.Collection("amoUsers"),
|
||||||
Tokens: mdb.Collection("tokens"),
|
Tokens: mdb.Collection("tokens"),
|
||||||
Pipelines: mdb.Collection("pipelines"),
|
Pipelines: mdb.Collection("pipelines"),
|
||||||
|
Steps: mdb.Collection("steps"),
|
||||||
Logger: logger,
|
Logger: logger,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -2,19 +2,21 @@ package models
|
|||||||
|
|
||||||
type Step struct {
|
type Step struct {
|
||||||
/* - айдишник воронки в амо*/
|
/* - айдишник воронки в амо*/
|
||||||
Pipelineid int `json:"PipelineID"`
|
Pipelineid int `json:"PipelineID" bson:"pipelineid"`
|
||||||
/* - связь с аккаунтом в интеграции амо*/
|
/* - связь с аккаунтом в интеграции амо*/
|
||||||
Accountid string `json:"AccountID"`
|
Accountid int `json:"AccountID" bson:"accountid"`
|
||||||
/* - айдишник шага воронки в амо*/
|
/* - айдишник шага воронки в амо*/
|
||||||
Amoid int `json:"AmoID"`
|
Amoid int `json:"AmoID" bson:"amoid"`
|
||||||
/* - цвет шага в амо*/
|
/* - цвет шага в амо*/
|
||||||
Color string `json:"Color"`
|
Color string `json:"Color" bson:"color"`
|
||||||
/* - таймштамп создания воронки в нашей системе*/
|
/* - таймштамп создания воронки в нашей системе*/
|
||||||
Createdat int `json:"CreatedAt"`
|
Createdat int64 `json:"CreatedAt" bson:"createdat"`
|
||||||
|
// время обновления
|
||||||
|
UpdateAt int64 `json:"UpdateAt" bson:"updateAt"`
|
||||||
/* - флаг мягкого удаления*/
|
/* - флаг мягкого удаления*/
|
||||||
Deleted bool `json:"Deleted"`
|
Deleted bool `json:"Deleted" bson:"deleted"`
|
||||||
/* - айдишник в нашей системе*/
|
/* - айдишник в нашей системе*/
|
||||||
ID int `json:"ID"`
|
ID string `json:"ID" bson:"ID"`
|
||||||
/* - название воронки в амо*/
|
/* - название воронки в амо*/
|
||||||
Name string `json:"Name"`
|
Name string `json:"Name" bson:"name"`
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,7 @@ type Deps struct {
|
|||||||
MdbUser *mongo.Collection
|
MdbUser *mongo.Collection
|
||||||
Tokens *mongo.Collection
|
Tokens *mongo.Collection
|
||||||
Pipelines *mongo.Collection
|
Pipelines *mongo.Collection
|
||||||
|
Steps *mongo.Collection
|
||||||
Logger *zap.Logger
|
Logger *zap.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -16,6 +17,7 @@ type Repository struct {
|
|||||||
mdbUser *mongo.Collection
|
mdbUser *mongo.Collection
|
||||||
tokens *mongo.Collection
|
tokens *mongo.Collection
|
||||||
pipelines *mongo.Collection
|
pipelines *mongo.Collection
|
||||||
|
steps *mongo.Collection
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -24,6 +26,7 @@ func NewRepository(deps Deps) *Repository {
|
|||||||
mdbUser: deps.MdbUser,
|
mdbUser: deps.MdbUser,
|
||||||
tokens: deps.Tokens,
|
tokens: deps.Tokens,
|
||||||
pipelines: deps.Pipelines,
|
pipelines: deps.Pipelines,
|
||||||
|
steps: deps.Steps,
|
||||||
logger: deps.Logger,
|
logger: deps.Logger,
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -2,7 +2,11 @@ package repository
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"amocrm/internal/models"
|
"amocrm/internal/models"
|
||||||
|
"amocrm/internal/models/amo"
|
||||||
"context"
|
"context"
|
||||||
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (r *Repository) GettingStepsFromCash(ctx context.Context) (*models.UserListStepsResp, error) {
|
func (r *Repository) GettingStepsFromCash(ctx context.Context) (*models.UserListStepsResp, error) {
|
||||||
@ -17,3 +21,69 @@ func (r *Repository) UpdateListSteps(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Repository) CheckSteps(ctx context.Context, accountID string, steps []amo.Statuses) error {
|
||||||
|
for _, s := range steps {
|
||||||
|
step := models.Step{
|
||||||
|
ID: accountID,
|
||||||
|
Pipelineid: s.PipelineID,
|
||||||
|
Name: s.Name,
|
||||||
|
Accountid: s.AccountID,
|
||||||
|
Amoid: s.ID,
|
||||||
|
Color: s.Color,
|
||||||
|
}
|
||||||
|
|
||||||
|
existingStep, err := r.GetStepByID(ctx, accountID, s.ID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if existingStep != nil {
|
||||||
|
step.UpdateAt = time.Now().Unix()
|
||||||
|
err := r.UpdateStep(ctx, &step)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
step.Createdat = time.Now().Unix()
|
||||||
|
err := r.InsertStep(ctx, &step)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Repository) GetStepByID(ctx context.Context, accountID string, amoid int) (*models.Step, error) {
|
||||||
|
var step models.Step
|
||||||
|
filter := bson.M{"id": accountID, "amoid": amoid}
|
||||||
|
err := r.steps.FindOne(ctx, filter).Decode(&step)
|
||||||
|
if err == mongo.ErrNoDocuments {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &step, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Repository) UpdateStep(ctx context.Context, step *models.Step) error {
|
||||||
|
filter := bson.M{"id": step.ID, "amoid": step.Amoid}
|
||||||
|
update := bson.M{"$set": bson.M{
|
||||||
|
"accountid": step.Accountid,
|
||||||
|
"name": step.Name,
|
||||||
|
"updateAt": step.UpdateAt,
|
||||||
|
"deleted": step.Deleted,
|
||||||
|
"color": step.Color,
|
||||||
|
"pipelineid": step.Pipelineid,
|
||||||
|
}}
|
||||||
|
_, err := r.steps.UpdateOne(ctx, filter, update)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Repository) InsertStep(ctx context.Context, step *models.Step) error {
|
||||||
|
_, err := r.steps.InsertOne(ctx, step)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
@ -61,6 +61,19 @@ func (wc *DataUpdater) processTasks(ctx context.Context) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
wc.logger.Error("error update pipelines in mongo:", zap.Error(err))
|
wc.logger.Error("error update pipelines in mongo:", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, pipeline := range pipelines.Embedded.Pipelines {
|
||||||
|
steps, err := wc.amoClient.GetListSteps(pipeline.ID, token.AccessToken)
|
||||||
|
if err != nil {
|
||||||
|
wc.logger.Error("error getting list steps pipeline:", zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err = wc.repo.CheckSteps(ctx, token.AccountID, steps.Embedded.Statuses)
|
||||||
|
if err != nil {
|
||||||
|
wc.logger.Error("error update pipeline steps in mongo:", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -511,7 +511,7 @@ components:
|
|||||||
description: объект шага воронки амо
|
description: объект шага воронки амо
|
||||||
properties:
|
properties:
|
||||||
ID:
|
ID:
|
||||||
type: integer
|
type: string
|
||||||
description: айдишник в нашей системе
|
description: айдишник в нашей системе
|
||||||
AmoID:
|
AmoID:
|
||||||
type: integer
|
type: integer
|
||||||
|
@ -124,7 +124,7 @@ func (a *Amo) CreateWebHook(req amo2.WebHookRequest) (*amo2.CreateWebHookResp, e
|
|||||||
}
|
}
|
||||||
|
|
||||||
if statusCode != fiber.StatusOK {
|
if statusCode != fiber.StatusOK {
|
||||||
errorMessage := fmt.Sprintf("received an incorrect response from CreateWebHook: %d", statusCode)
|
errorMessage := fmt.Sprintf("received an incorrect response from CreateWebHook: %s", string(resBody))
|
||||||
a.logger.Error(errorMessage, zap.Int("status", statusCode))
|
a.logger.Error(errorMessage, zap.Int("status", statusCode))
|
||||||
return nil, fmt.Errorf(errorMessage)
|
return nil, fmt.Errorf(errorMessage)
|
||||||
}
|
}
|
||||||
@ -155,12 +155,12 @@ func (a *Amo) DeleteWebHook() {
|
|||||||
|
|
||||||
// https://www.amocrm.ru/developers/content/crm_platform/leads_pipelines#%D0%A1%D0%BF%D0%B8%D1%81%D0%BE%D0%BA-%D1%81%D1%82%D0%B0%D1%82%D1%83%D1%81%D0%BE%D0%B2-%D0%B2%D0%BE%D1%80%D0%BE%D0%BD%D0%BA%D0%B8-%D1%81%D0%B4%D0%B5%D0%BB%D0%BE%D0%BA
|
// https://www.amocrm.ru/developers/content/crm_platform/leads_pipelines#%D0%A1%D0%BF%D0%B8%D1%81%D0%BE%D0%BA-%D1%81%D1%82%D0%B0%D1%82%D1%83%D1%81%D0%BE%D0%B2-%D0%B2%D0%BE%D1%80%D0%BE%D0%BD%D0%BA%D0%B8-%D1%81%D0%B4%D0%B5%D0%BB%D0%BE%D0%BA
|
||||||
// GET /api/v4/leads/pipelines/{pipeline_id}/statuses
|
// GET /api/v4/leads/pipelines/{pipeline_id}/statuses
|
||||||
func (a *Amo) GetListSteps(pipelineID int) (*amo2.ResponseGetListSteps, error) {
|
func (a *Amo) GetListSteps(pipelineID int, accessToken string) (*amo2.ResponseGetListSteps, error) {
|
||||||
for {
|
for {
|
||||||
if a.rateLimiter.Check() {
|
if a.rateLimiter.Check() {
|
||||||
uri := fmt.Sprintf("%s/api/v4/leads/pipelines/%d/statuses", a.baseApiURL, pipelineID)
|
uri := fmt.Sprintf("%s/api/v4/leads/pipelines/%d/statuses", a.baseApiURL, pipelineID)
|
||||||
|
|
||||||
agent := a.fiberClient.Get(uri)
|
agent := a.fiberClient.Get(uri)
|
||||||
|
agent.Set("Authorization", "Bearer "+accessToken)
|
||||||
statusCode, resBody, errs := agent.Bytes()
|
statusCode, resBody, errs := agent.Bytes()
|
||||||
if len(errs) > 0 {
|
if len(errs) > 0 {
|
||||||
for _, err := range errs {
|
for _, err := range errs {
|
||||||
@ -170,7 +170,7 @@ func (a *Amo) GetListSteps(pipelineID int) (*amo2.ResponseGetListSteps, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if statusCode != fiber.StatusOK {
|
if statusCode != fiber.StatusOK {
|
||||||
errorMessage := fmt.Sprintf("received an incorrect response from GetListSteps: %d", statusCode)
|
errorMessage := fmt.Sprintf("received an incorrect response from GetListSteps: %s", string(resBody))
|
||||||
a.logger.Error(errorMessage, zap.Int("status", statusCode))
|
a.logger.Error(errorMessage, zap.Int("status", statusCode))
|
||||||
return nil, fmt.Errorf(errorMessage)
|
return nil, fmt.Errorf(errorMessage)
|
||||||
}
|
}
|
||||||
@ -211,7 +211,7 @@ func (a *Amo) GetListFields(req amo2.GetListFieldsReq, url string) (*amo2.Respon
|
|||||||
}
|
}
|
||||||
|
|
||||||
if statusCode != fiber.StatusOK {
|
if statusCode != fiber.StatusOK {
|
||||||
errorMessage := fmt.Sprintf("received an incorrect response from GetListFields: %d", statusCode)
|
errorMessage := fmt.Sprintf("received an incorrect response from GetListFields: %s", string(resBody))
|
||||||
a.logger.Error(errorMessage, zap.Int("status", statusCode))
|
a.logger.Error(errorMessage, zap.Int("status", statusCode))
|
||||||
return nil, fmt.Errorf(errorMessage)
|
return nil, fmt.Errorf(errorMessage)
|
||||||
}
|
}
|
||||||
@ -260,7 +260,7 @@ func (a *Amo) GetListTags(req amo2.GetListTagsReq) (*amo2.ResponseGetListTags, e
|
|||||||
}
|
}
|
||||||
|
|
||||||
if statusCode != fiber.StatusOK {
|
if statusCode != fiber.StatusOK {
|
||||||
errorMessage := fmt.Sprintf("received an incorrect response from GetListTags: %d", statusCode)
|
errorMessage := fmt.Sprintf("received an incorrect response from GetListTags: %s", string(resBody))
|
||||||
a.logger.Error(errorMessage, zap.Int("status", statusCode))
|
a.logger.Error(errorMessage, zap.Int("status", statusCode))
|
||||||
return nil, fmt.Errorf(errorMessage)
|
return nil, fmt.Errorf(errorMessage)
|
||||||
}
|
}
|
||||||
@ -292,7 +292,7 @@ func (a *Amo) GetUserInfo(accessToken string) (*amo2.AmocrmUserInformation, erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
if statusCode != fiber.StatusOK {
|
if statusCode != fiber.StatusOK {
|
||||||
errorMessage := fmt.Sprintf("received an incorrect response from GetUserInfo: %d", statusCode)
|
errorMessage := fmt.Sprintf("received an incorrect response from GetUserInfo: %s", string(resBody))
|
||||||
a.logger.Error(errorMessage, zap.Int("status", statusCode))
|
a.logger.Error(errorMessage, zap.Int("status", statusCode))
|
||||||
return nil, fmt.Errorf(errorMessage)
|
return nil, fmt.Errorf(errorMessage)
|
||||||
}
|
}
|
||||||
@ -325,7 +325,7 @@ func (a *Amo) GetUserByID(accessToken string, id int64) (*amo2.OneUserInfo, erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
if statusCode != fiber.StatusOK {
|
if statusCode != fiber.StatusOK {
|
||||||
errorMessage := fmt.Sprintf("received an incorrect response from GetUserByID: %d", statusCode)
|
errorMessage := fmt.Sprintf("received an incorrect response from GetUserByID:%s", string(resBody))
|
||||||
a.logger.Error(errorMessage, zap.Int("status", statusCode))
|
a.logger.Error(errorMessage, zap.Int("status", statusCode))
|
||||||
return nil, fmt.Errorf(errorMessage)
|
return nil, fmt.Errorf(errorMessage)
|
||||||
}
|
}
|
||||||
@ -358,7 +358,7 @@ func (a *Amo) GetListPipelines(accessToken string) (*amo2.PipelineResponse, erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
if statusCode != fiber.StatusOK {
|
if statusCode != fiber.StatusOK {
|
||||||
errorMessage := fmt.Sprintf("received an incorrect response from GetListPipelines: %d", statusCode)
|
errorMessage := fmt.Sprintf("received an incorrect response from GetListPipelines: %s", string(resBody))
|
||||||
a.logger.Error(errorMessage, zap.Int("status", statusCode))
|
a.logger.Error(errorMessage, zap.Int("status", statusCode))
|
||||||
return nil, fmt.Errorf(errorMessage)
|
return nil, fmt.Errorf(errorMessage)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user