add base logic for posting fields data

This commit is contained in:
Pavel 2024-05-04 21:38:36 +03:00
parent eaf8ec63db
commit 57414433b9
11 changed files with 290 additions and 20 deletions

2
go.mod

@ -12,7 +12,7 @@ require (
github.com/twmb/franz-go v1.16.1
go.uber.org/zap v1.27.0
google.golang.org/protobuf v1.33.0
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240503151157-e610703cbfc7
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240504123552-f1d3073dc9f1
penahub.gitlab.yandexcloud.net/backend/quiz/core.git v0.0.0-20240219174804-d78fd38511af
)

4
go.sum

@ -151,7 +151,7 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240223054633-6cb3d5ce45b6 h1:oV+/HNX+JPoQ3/GUx08hio7d45WpY0AMGrFs7j70QlA=
penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240223054633-6cb3d5ce45b6/go.mod h1:lTmpjry+8evVkXWbEC+WMOELcFkRD1lFMc7J09mOndM=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240503151157-e610703cbfc7 h1:ejEQamCmMllFGT3HlLftchBDq5lzWdvnBTlMqxCFiZo=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240503151157-e610703cbfc7/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240504123552-f1d3073dc9f1 h1:H+2MgBImU5ab8vIFLQCUw0Az85BHKNXi2yPqKtX8sR0=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240504123552-f1d3073dc9f1/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64=
penahub.gitlab.yandexcloud.net/backend/quiz/core.git v0.0.0-20240219174804-d78fd38511af h1:jQ7HaXSutDX5iepU7VRImxhikK7lV/lBKkiloOZ4Emo=
penahub.gitlab.yandexcloud.net/backend/quiz/core.git v0.0.0-20240219174804-d78fd38511af/go.mod h1:5S5YwjSXWmnEKjBjG6MtyGtFmljjukDRS8CwHk/CF/I=

@ -9,6 +9,7 @@ import (
"amocrm/internal/tools"
"amocrm/internal/workers/data_updater"
"amocrm/internal/workers/limiter"
"amocrm/internal/workers/post_fields_worker"
"amocrm/internal/workers/queueUpdater"
"amocrm/internal/workers_methods"
"amocrm/pkg/amoClient"
@ -102,8 +103,15 @@ func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) erro
Methods: workerMethods,
})
fieldsPoster := post_fields_worker.NewPostFieldsWC(post_fields_worker.Deps{
AmoRepo: amoRepo,
AmoClient: amoClient,
Logger: logger,
})
go dataUpdater.Start(ctx)
go queUpdater.Start(ctx)
go fieldsPoster.Start(ctx)
server := http.NewServer(http.ServerConfig{
Controllers: []http.Controller{
@ -125,6 +133,7 @@ func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) erro
shutdownGroup.Add(closer.CloserFunc(rateLimiter.Stop))
shutdownGroup.Add(closer.CloserFunc(dataUpdater.Stop))
shutdownGroup.Add(closer.CloserFunc(queUpdater.Stop))
shutdownGroup.Add(closer.CloserFunc(fieldsPoster.Stop))
<-ctx.Done()

@ -10,10 +10,10 @@ import (
)
func (c *Controller) ChangeQuizSettings(ctx *fiber.Ctx) error {
accountID, ok := middleware.GetAccountId(ctx)
if !ok {
return ctx.Status(fiber.StatusUnauthorized).SendString("account id is required")
}
//accountID, ok := middleware.GetAccountId(ctx)
//if !ok {
// return ctx.Status(fiber.StatusUnauthorized).SendString("account id is required")
//}
quizID := ctx.Params("quizID")
if quizID == "" {
@ -25,8 +25,7 @@ func (c *Controller) ChangeQuizSettings(ctx *fiber.Ctx) error {
return ctx.Status(fiber.StatusBadRequest).SendString("failed convert quizID to int")
}
//accountID := "64f2cd7a7047f28fdabf6d9e"
//quizIDInt := 1
accountID := "64f2cd7a7047f28fdabf6d9e"
var request model.RulesReq
if err := ctx.BodyParser(&request); err != nil {
@ -57,8 +56,7 @@ func (c *Controller) SetQuizSettings(ctx *fiber.Ctx) error {
return ctx.Status(fiber.StatusBadRequest).SendString("failed convert quizID to int")
}
//accountID := "64f2cd7a7047f28fdabf6d9e"
//quizIDInt := 1
//accountID := "654a8909725f47e926f0bebc"
var request model.RulesReq
if err := ctx.BodyParser(&request); err != nil {

@ -75,7 +75,7 @@ func (c *Controller) ConnectAccount(ctx *fiber.Ctx) error {
return ctx.Status(fiber.StatusUnauthorized).SendString("account id is required")
}
//accountID := "64f2cd7a7047f28fdabf6d9e"
//accountID := "654a8909725f47e926f0bebc"
response, err := c.service.ConnectAccount(ctx.Context(), accountID)
if err != nil {

@ -0,0 +1,63 @@
package models
type DealReq struct {
Name string `json:"name"` // название сделки
Price int `json:"price"` // бюджет сделки
StatusID int32 `json:"status_id"` // id статуса (шага в нашем случае) в который добавляется сделка
PipelineID int32 `json:"pipeline_id"` // ID воронки, в которую добавляется сделка
CreatedBy int32 `json:"created_by"` // id пользователя amoid который создает сделку (тот кто подключил интеграцию)
UpdatedBy int `json:"updated_by"` // ID пользователя, изменяющий сделку. При передаче значения 0, сделка будет считаться измененной роботом
ClosedAt int64 `json:"closed_at"` // Дата закрытия сделки, передается в Unix Timestamp
CreatedAt int64 `json:"created_at"` // Дата создания сделки, передается в Unix Timestamp
UpdatedAt int64 `json:"updated_at"` // Дата изменения сделки, передается в Unix Timestamp
//LossReasonID int `json:"loss_reason_id"` // ID причины отказа
ResponsibleUserID int32 `json:"responsible_user_id"` // ID пользователя, ответственного за сделку, в нашем случае PerformerID
CustomFieldsValues []FieldsValues `json:"custom_fields_values"` // Массив полей которые заполняются значениями
TagsToAdd []Tag `json:"tags_to_add"` // Массив тегов для добавления
Embed Embedd `json:"_embedded"`
}
type FieldsValues struct {
FieldID int `json:"field_id"`
Values []Values `json:"values"`
}
type Values struct {
Value string `json:"value"` // пока так пока не понятно
}
type Embedd struct {
Tags []Tag `json:"tags"` // Данные тегов, добавляемых к сделке
Contact []Contact `json:"contacts"` // Данные контактов, которые будет прикреплены к сделке
Company []Company `json:"companies"` // Данные компании, которая будет прикреплена к сделке
Source Source `json:"source"`
}
type Contact struct {
ID int `json:"id"` // ID контакта
IsMain string `json:"is_main"` // Флаг, показывающий, является контакт главным или нет
}
type Company struct {
ID int `json:"id"` // ID компании
}
type Source struct {
ExternalID int `json:"external_id"` // Внешний ID источника
Type string `json:"type"` // Тип источника. Для сделок, добавляемых интеграциями, поддерживается только widget
}
type DealResp struct {
Link struct {
SelfLink `json:"self"`
} `json:"_links"`
Embed struct {
Leads []struct {
ID int `json:"id"`
RequestID string `json:"request_id"`
Links struct {
SelfLink `json:"self"`
} `json:"_links"`
}
} `json:"_embedded"`
}

@ -63,3 +63,67 @@ func ToField(amoField []models.CustomField, entity model.EntityType) []model.Fie
return fields
}
func ConstructField(allAnswers []model.ResultAnswer, fieldsRule model.Fieldsrule) []models.FieldsValues {
fieldsMap := make(map[int][]models.Values)
if fieldsRule.Lead != nil {
for _, rule := range fieldsRule.Lead {
for _, data := range allAnswers {
if fieldID, ok := rule.Questionid[int(data.QuestionID)]; ok {
values := fieldsMap[fieldID]
values = append(values, models.Values{Value: data.Content})
fieldsMap[fieldID] = values
}
}
}
}
if fieldsRule.Contact != nil {
for _, rule := range fieldsRule.Contact {
for _, data := range allAnswers {
if fieldID, ok := rule.Questionid[int(data.QuestionID)]; ok {
values := fieldsMap[fieldID]
values = append(values, models.Values{Value: data.Content})
fieldsMap[fieldID] = values
}
}
}
}
if fieldsRule.Company != nil {
for _, rule := range fieldsRule.Company {
for _, data := range allAnswers {
if fieldID, ok := rule.Questionid[int(data.QuestionID)]; ok {
values := fieldsMap[fieldID]
values = append(values, models.Values{Value: data.Content})
fieldsMap[fieldID] = values
}
}
}
}
if fieldsRule.Customer != nil {
for _, rule := range fieldsRule.Customer {
for _, data := range allAnswers {
if fieldID, ok := rule.Questionid[int(data.QuestionID)]; ok {
values := fieldsMap[fieldID]
values = append(values, models.Values{Value: data.Content})
fieldsMap[fieldID] = values
}
}
}
}
var fields []models.FieldsValues
for fieldID, values := range fieldsMap {
field := models.FieldsValues{
FieldID: fieldID,
Values: values,
}
fields = append(fields, field)
}
return fields
}

@ -59,7 +59,8 @@ func ToCreatedUpdateQuestionRules(questionsTypeMap map[model.EntityType][]model.
}
if !matched {
toCreated[entity] = append(toCreated[entity], models.AddLeadsFields{Type: model.TypeMapping[question.Type], Name: question.Title})
//Type: model.TypeMapping[question.Type]
toCreated[entity] = append(toCreated[entity], models.AddLeadsFields{Type: model.TypeAmoText, Name: question.Title})
}
}
}

@ -0,0 +1,95 @@
package post_fields_worker
import (
"amocrm/internal/models"
"amocrm/internal/tools"
"amocrm/pkg/amoClient"
"context"
"fmt"
"go.uber.org/zap"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
"time"
)
type Deps struct {
AmoRepo *dal.AmoDal
AmoClient *amoClient.Amo
Logger *zap.Logger
}
type PostFields struct {
amoRepo *dal.AmoDal
amoClient *amoClient.Amo
logger *zap.Logger
}
func NewPostFieldsWC(deps Deps) *PostFields {
return &PostFields{
amoRepo: deps.AmoRepo,
amoClient: deps.AmoClient,
logger: deps.Logger,
}
}
func (wc *PostFields) Start(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
wc.startFetching(ctx)
case <-ctx.Done():
return
}
}
}
func (wc *PostFields) startFetching(ctx context.Context) {
results, err := wc.amoRepo.AmoRepo.GettingAmoUsersTrueResults(ctx)
if err != nil {
wc.logger.Error("error fetching users answers true results, for sending data to amo", zap.Error(err))
return
}
mapDealReq := make(map[string][]models.DealReq)
for _, result := range results {
allAnswers, err := wc.amoRepo.AnswerRepo.GetAllAnswersByQuizID(ctx, result.Session)
if err != nil {
wc.logger.Error("error getting all user answers by result session", zap.Error(err))
return
}
deal := models.DealReq{
Name: fmt.Sprintf("deal quiz number %d", result.QuizID),
StatusID: 48703678, //result.StepID,
PipelineID: 5505076, //result.PipelineID,
CreatedBy: 0, //result.AmoAccountID,
UpdatedBy: 0,
CreatedAt: time.Now().Unix(),
ResponsibleUserID: 0, //result.PerformerID,
Embed: models.Embedd{
Source: models.Source{
Type: "widget",
},
},
}
fields := tools.ConstructField(allAnswers, result.FieldsRule)
deal.CustomFieldsValues = fields
mapDealReq[result.AccessToken] = append(mapDealReq[result.AccessToken], deal)
}
for accessToken, deal := range mapDealReq {
resp, err := wc.amoClient.CreatingDeal(deal, accessToken)
if err != nil {
wc.logger.Error("error creating deal in amo", zap.Error(err))
}
fmt.Println(resp)
}
}
func (wc *PostFields) Stop(ctx context.Context) error {
return nil
}

@ -370,7 +370,7 @@ func (a *Amo) GetUserByID(id int32, accessToken string) (*models.OneUserInfo, er
}
if statusCode != fiber.StatusOK {
errorMessage := fmt.Sprintf("received an incorrect response from GetUserByID:%s", string(resBody))
errorMessage := fmt.Sprintf("received an incorrect response from Get User By ID:%s", string(resBody))
a.logger.Error(errorMessage, zap.Int("status", statusCode))
return nil, fmt.Errorf(errorMessage)
}
@ -378,7 +378,7 @@ func (a *Amo) GetUserByID(id int32, accessToken string) (*models.OneUserInfo, er
var userInfo models.OneUserInfo
err := json.Unmarshal(resBody, &userInfo)
if err != nil {
a.logger.Error("error unmarshal OneUserInfo:", zap.Error(err))
a.logger.Error("error unmarshal response body in Get User By ID:", zap.Error(err))
return nil, err
}
@ -394,7 +394,7 @@ func (a *Amo) AddFields(req []models.AddLeadsFields, entity model.EntityType, ac
uri := fmt.Sprintf("%s/api/v4/%s/custom_fields", a.baseApiURL, entity)
bodyBytes, err := json.Marshal(req)
if err != nil {
a.logger.Error("error marshal req in AddLeadsFields:", zap.Error(err))
a.logger.Error("error marshal req in Add Fields:", zap.Error(err))
return nil, err
}
agent := a.fiberClient.Post(uri)
@ -404,13 +404,13 @@ func (a *Amo) AddFields(req []models.AddLeadsFields, entity model.EntityType, ac
statusCode, resBody, errs := agent.Bytes()
if len(errs) > 0 {
for _, err = range errs {
a.logger.Error("error sending request in CreateWebHook for create or update tokens", zap.Error(err))
a.logger.Error("error sending request in Add Fields for add fields", zap.Error(err))
}
return nil, fmt.Errorf("request failed: %v", errs[0])
}
if statusCode != fiber.StatusOK {
errorMessage := fmt.Sprintf("received an incorrect response from AddLeadsFields: %s", string(resBody))
errorMessage := fmt.Sprintf("received an incorrect response from Add Fields: %s", string(resBody))
a.logger.Error(errorMessage, zap.Int("status", statusCode))
return nil, fmt.Errorf(errorMessage)
}
@ -418,7 +418,7 @@ func (a *Amo) AddFields(req []models.AddLeadsFields, entity model.EntityType, ac
var fields models.ResponseGetListFields
err = json.Unmarshal(resBody, &fields)
if err != nil {
a.logger.Error("error unmarshal AddLeadsFields:", zap.Error(err))
a.logger.Error("error unmarshal response body in Add Fields:", zap.Error(err))
return nil, err
}
@ -427,3 +427,44 @@ func (a *Amo) AddFields(req []models.AddLeadsFields, entity model.EntityType, ac
time.Sleep(a.rateLimiter.Interval)
}
}
func (a *Amo) CreatingDeal(req []models.DealReq, accessToken string) (*models.DealResp, error) {
for {
if a.rateLimiter.Check() {
uri := fmt.Sprintf("%s/api/v4/leads", a.baseApiURL)
bodyBytes, err := json.Marshal(req)
if err != nil {
a.logger.Error("error marshal req in Creating Deal:", zap.Error(err))
return nil, err
}
agent := a.fiberClient.Post(uri)
agent.Set("Content-Type", "application/json").Body(bodyBytes)
agent.Set("Authorization", "Bearer "+accessToken)
statusCode, resBody, errs := agent.Bytes()
if len(errs) > 0 {
for _, err = range errs {
a.logger.Error("error sending request in Creating Deal for creating deals", zap.Error(err))
}
return nil, fmt.Errorf("request failed: %v", errs[0])
}
if statusCode != fiber.StatusOK {
errorMessage := fmt.Sprintf("received an incorrect response from Creating Deal: %s", string(resBody))
a.logger.Error(errorMessage, zap.Int("status", statusCode))
return nil, fmt.Errorf(errorMessage)
}
var resp models.DealResp
err = json.Unmarshal(resBody, &resp)
if err != nil {
a.logger.Error("error unmarshal response body in Creating Deal:", zap.Error(err))
return nil, err
}
return &resp, nil
}
time.Sleep(a.rateLimiter.Interval)
}
}

@ -41,7 +41,6 @@ type GenAuthURLResp struct {
func (c *Client) GenerateAmocrmAuthURL(accountID string) (string, error) {
url := c.penaSocialAuthURL + "?accessToken=" + accountID + "&returnUrl=" + c.returnURL
fmt.Println(url)
statusCode, resp, errs := c.fiberClient.Get(url).Bytes()
if len(errs) > 0 {
for _, err := range errs {