package mongo import ( "context" "errors" "fmt" "heruvym/model" "time" "github.com/rs/xid" "github.com/themakers/hlog" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) const ( collMessages = "messages" collTickets = "tickets" collAccount = "account" ) type DepsDAL struct { MongoDatabase *mongo.Database HLogger hlog.Logger } type DAL struct { logger hlog.Logger colMsg, colTck, colAcc *mongo.Collection client *mongo.Client } type ErrorConnectToDB struct { Err error MongoURI string } type ErrorPingDB struct { Err error MongoURI string } type InfoPing struct { MongoURI string Nanoseconds int64 } func New(ctx context.Context, deps DepsDAL) (*DAL, error) { dal := &DAL{ client: deps.MongoDatabase.Client(), colMsg: deps.MongoDatabase.Collection(collMessages), colTck: deps.MongoDatabase.Collection(collTickets), colAcc: deps.MongoDatabase.Collection(collAccount), logger: deps.HLogger.Module("DAL"), } // Будет ошибка создания индексов только если эти индексы изменить и предварительно не удалить старые err := dal.CreateTicketIndex(ctx) if err != nil { return nil, err } err = dal.CreateMessageIndex(ctx) if err != nil { return nil, err } err = dal.CreateAccountIndex(ctx) if err != nil { return nil, err } return dal, nil } type ErrorInsert struct { Err error UserID, SessionID string } type ErrorCreateIndex struct { Err error } func (d *DAL) CreateMessageIndex(ctx context.Context) error { keys := bson.D{{"$**", "text"}} opts := options.Index().SetDefaultLanguage("russian") _, err := d.colMsg.Indexes().CreateOne(ctx, mongo.IndexModel{Keys: keys, Options: opts}) if err != nil { d.logger.Emit(ErrorCreateIndex{err}) } return err } func (d *DAL) CreateTicketIndex(ctx context.Context) error { keys := bson.D{{"$**", "text"}} opts := options.Index().SetDefaultLanguage("russian") _, err := d.colTck.Indexes().CreateOne(ctx, mongo.IndexModel{Keys: keys, Options: opts}) if err != nil { d.logger.Emit(ErrorCreateIndex{err}) } return err } func (d *DAL) CreateAccountIndex(ctx context.Context) error { keys := bson.D{{"$**", "text"}} opts := options.Index().SetDefaultLanguage("russian") _, err := d.colAcc.Indexes().CreateOne(ctx, mongo.IndexModel{Keys: keys, Options: opts}) if err != nil { d.logger.Emit(ErrorCreateIndex{err}) } return err } func (d *DAL) PutMessage( ctx context.Context, message, userID, sessionID, ticketID string, files []string, ) (*model.Message, error) { insertable := model.Message{ ID: xid.New().String(), UserID: userID, SessionID: sessionID, TicketID: ticketID, Message: message, Files: files, Shown: map[string]int{}, CreatedAt: time.Now(), } if _, err := d.colMsg.InsertOne(ctx, &insertable); err != nil { d.logger.Emit(ErrorInsert{ Err: err, UserID: userID, SessionID: sessionID, }) return nil, err } return &insertable, nil } func (d *DAL) PutSCResponse( ctx context.Context, message, userID, sessionID, ticketID string, files []string, ) (*model.Message, error) { insertable := model.Message{ ID: xid.New().String(), UserID: userID, SessionID: sessionID, TicketID: ticketID, Message: message, Files: files, RequestScreenshot: "response", Shown: map[string]int{}, CreatedAt: time.Now(), } if _, err := d.colMsg.InsertOne(ctx, &insertable); err != nil { d.logger.Emit(ErrorInsert{ Err: err, UserID: userID, SessionID: sessionID, }) return nil, err } return &insertable, nil } func (d *DAL) PutSCRequest( ctx context.Context, userID, sessionID, ticketID string, ) (*model.Message, error) { insertable := model.Message{ ID: xid.New().String(), UserID: userID, SessionID: sessionID, TicketID: ticketID, Message: "", Files: []string{}, Shown: map[string]int{}, RequestScreenshot: "acquisition", CreatedAt: time.Now(), } if _, err := d.colMsg.InsertOne(ctx, &insertable); err != nil { d.logger.Emit(ErrorInsert{ Err: err, UserID: userID, SessionID: sessionID, }) return nil, err } return &insertable, nil } func (d *DAL) CreateTicket( ctx context.Context, userID, sessionID, origin, title, message string, files []string, ) (string, error) { ticketID := xid.New().String() if _, err := d.colTck.InsertOne(ctx, &model.Ticket{ ID: ticketID, UserID: userID, SessionID: sessionID, Title: title, State: model.StateOpen, CreatedAt: time.Now(), UpdatedAt: time.Now(), Rate: -1, Origin: origin, TopMessage: model.Message{ ID: xid.New().String(), UserID: userID, SessionID: sessionID, TicketID: ticketID, Message: message, Files: []string{}, Shown: map[string]int{}, CreatedAt: time.Now(), }, }); err != nil { d.logger.Emit(ErrorInsert{ Err: err, UserID: userID, SessionID: sessionID, }) return "", err } return ticketID, nil } func (d *DAL) GetTickets4Sess(ctx context.Context, sessID string) ([]model.Ticket, int64, error) { sort := bson.M{"State": -1, "UpdatedAt": 1} opts := options.Find().SetSort(sort) query := bson.M{ "SessionID": sessID, } cur, err := d.colTck.Find(ctx, query, opts) if err != nil { return nil, 0, err } var result []model.Ticket if err := cur.All(ctx, &result); err != nil { return nil, 0, err } col, err := d.colTck.CountDocuments(ctx, query) if err != nil { return nil, 0, err } return result, col, nil } func (d *DAL) GetTicket4Sess( ctx context.Context, ticketID, sessID string) (*model.Ticket, error) { var result model.Ticket if err := d.colTck.FindOne(ctx, bson.M{ "_id": ticketID, "SessionID": sessID, }).Decode(&result); err != nil { return nil, err } return &result, nil } func (d *DAL) GetTicket4User( ctx context.Context, ticketID, userID string) (*model.Ticket, error) { var result model.Ticket if err := d.colTck.FindOne(ctx, bson.M{ "_id": ticketID, "UserID": userID, }).Decode(&result); err != nil { return nil, err } return &result, nil } func (d *DAL) WatchTickets( ctx context.Context, userID string, yield func(ticket model.Ticket) error) error { operationTypes := []bson.D{{{"operationType", "insert"}}, {{"operationType", "update"}}} matchStage := bson.M{ "$and": bson.A{ bson.M{"$or": operationTypes}, bson.M{"fullDocument.UserID": userID}, }, } matchPipeline := mongo.Pipeline{ bson.D{{"$match", matchStage}}, } cs, err := d.colTck.Watch(ctx, matchPipeline, options.ChangeStream().SetFullDocument(options.UpdateLookup)) if err != nil { fmt.Println("111", err, matchPipeline) return err } for cs.Next(ctx) { var ( piece model.Ticket change Change ) if err := cs.Decode(&change); err != nil { return err } if err := bson.Unmarshal(change.FullDocument, &piece); err != nil { return err } if err := yield(piece); err != nil { return err } } return nil } func (d *DAL) YieldActiveTickets( ctx context.Context, yield func(ticket model.Ticket) error) error { cursor, err := d.colTck.Find(ctx, bson.M{ "State": model.StateOpen, }, options.Find().SetLimit(20)) if err != nil { return err } var piece model.Ticket for cursor.Next(ctx) { if err := cursor.Decode(&piece); err != nil { return err } if err := yield(piece); err != nil { return err } } return nil } func (d *DAL) YieldTickets(ctx context.Context, limit int64) ([]model.Ticket, int64, error) { sort := bson.D{{"State", -1}, {"UpdatedAt", 1}} cursor, err := d.colTck.Find(ctx, bson.M{}, options.Find().SetLimit(limit).SetSort(sort)) if err != nil { return nil, 0, err } var result []model.Ticket err = cursor.All(ctx, &result) if err != nil { return nil, 0, err } col, err := d.colTck.CountDocuments(ctx, bson.M{}) if err != nil { return nil, 0, err } return result, col, nil } func (d *DAL) YieldUserTickets(ctx context.Context, userID string, limit, offset int64) ([]model.Ticket, int64, error) { query := bson.M{ "UserID": userID, } fmt.Println("UserID", userID) sort := bson.D{{"State", -1}, {"UpdatedAt", -1}} cursor, err := d.colTck.Find(ctx, query, options.Find().SetSort(sort).SetLimit(limit).SetSkip(offset)) if err != nil { return nil, 0, err } var result []model.Ticket err = cursor.All(ctx, &result) if err != nil { return nil, 0, err } col, err := d.colTck.CountDocuments(ctx, query) if err != nil { return nil, 0, err } return result, col, nil } func (d *DAL) WatchAllTickets(ctx context.Context, yield func(ticket model.Ticket) error) error { operationTypes := []bson.D{ {{"operationType", "insert"}}, {{"operationType", "update"}}, } matchStage := bson.D{{"$or", operationTypes}} matchPipeline := mongo.Pipeline{ bson.D{ {"$match", matchStage}, }, } cs, err := d.colTck.Watch(ctx, matchPipeline, options.ChangeStream().SetFullDocument(options.UpdateLookup)) if err != nil { return err } for cs.Next(ctx) { var ( piece model.Ticket change Change ) if err := cs.Decode(&change); err != nil { return err } if err := bson.Unmarshal(change.FullDocument, &piece); err != nil { return err } if err := yield(piece); err != nil { return err } } return nil } func (d *DAL) UpdateTopMessage(ctx context.Context, ticketID string, msg *model.Message) error { if err := d.colTck.FindOneAndUpdate(ctx, bson.M{ "_id": ticketID, }, bson.M{ "$set": bson.M{ "UpdatedAt": time.Now(), "TopMessage": msg, }, }).Decode(&model.Ticket{}); err != nil { return err } return nil } func (d *DAL) YieldMessages( ctx context.Context, ticketID string, yield func(ticket model.Message) error) error { cursor, err := d.colMsg.Find(ctx, bson.M{ "TicketID": ticketID, }, options.Find().SetLimit(20).SetSort(bson.D{{"CreatedAt", -1}})) if err != nil { return err } var piece model.Message for cursor.Next(ctx) { if err := cursor.Decode(&piece); err != nil { return err } if err := yield(piece); err != nil { return err } } return nil } type OpType string const ( OpInsert OpType = "insert" OpDelete OpType = "delete" OpReplace OpType = "replace" OpUpdate OpType = "update" ) type Change struct { ID struct { Data string `bson:"_data"` } `bson:"_id"` OperationType OpType `bson:"operationType"` FullDocument bson.Raw `bson:"fullDocument"` NS struct { DB string `bson:"db"` Coll string `bson:"coll"` } `bson:"ns"` To *struct { DB string `bson:"db"` Coll string `bson:"coll"` } `bson:"to"` DocumentKey bson.Raw `bson:"documentKey"` UpdateDescription *struct { UpdatedFields bson.Raw `bson:"updatedFields"` RemovedFields []string `bson:"removedFields"` } `bson:"updateDescription"` ClusterTime primitive.Timestamp `bson:"clusterTime"` TxnNumber int64 `bson:"txnNumber"` LSID bson.Raw `bson:"lsid"` //LSID *struct { // ID primitive.ObjectID `bson:"id"` // UID primitive.Binary `bson:"uid"` //} `bson:"lsid"` } func (d *DAL) WatchMessages( ctx context.Context, ticketID string, yield func(ticket model.Message) error) error { operationTypes := []bson.D{{{"operationType", "insert"}}, {{"operationType", "update"}}} matchStage := bson.D{{"$and", []bson.D{ { {"$or", operationTypes}, }, { { "fullDocument.TicketID", ticketID, }, }, }}} matchPipeline := mongo.Pipeline{ bson.D{{"$match", matchStage}}, } cs, err := d.colMsg.Watch(ctx, matchPipeline, options.ChangeStream().SetFullDocument(options.UpdateLookup)) if err != nil { return err } for cs.Next(ctx) { var ( piece model.Message change Change ) if err := cs.Decode(&change); err != nil { return err } bson.Unmarshal(change.FullDocument, &piece) if err := yield(piece); err != nil { return err } } return nil } func (d *DAL) SetShown(ctx context.Context, messageID, userID string) error { if err := d.colMsg.FindOneAndUpdate(ctx, bson.M{ "_id": messageID, }, bson.M{ "$set": bson.M{ fmt.Sprintf("Shown.%s", userID): 1, }, }).Decode(&model.Message{}); err != nil { return err } if err := d.colTck.FindOneAndUpdate(ctx, bson.M{ "TopMessage._id": messageID, }, bson.M{ "$set": bson.M{ fmt.Sprintf("TopMessage.Shown.%s", userID): 1, }, }).Decode(&model.Message{}); err != nil { return err } return nil } func (d *DAL) GetTicketPage( ctx context.Context, state, srch string, limit, skip int64, ) (*[]model.Ticket, int64, error) { query := bson.M{} if state != "" { query["State"] = state } if srch != "" { query["$text"] = bson.M{ "$search": srch, } } sort := bson.D{{"State", -1}, {"UpdatedAt", -1}} cur, err := d.colTck.Find(ctx, query, options.Find().SetSort(sort).SetLimit(limit).SetSkip(skip*limit)) if err != nil { return nil, 0, err } var result []model.Ticket if err := cur.All(ctx, &result); err != nil { return nil, 0, err } col, err := d.colTck.CountDocuments(ctx, query) if err != nil { return nil, 0, err } return &result, col, nil } func (d *DAL) GetMessagesPage(ctx context.Context, search, ticketID string, limit, offset int64) ([]model.Message, error) { var ( query bson.M result []model.Message ) if ticketID != "" { query = bson.M{ "TicketID": ticketID, } } if search != "" { query = bson.M{ "TicketID": ticketID, "$text": bson.M{ "$search": search, }, } } sort := bson.D{{"CreatedAt", -1}} cur, err := d.colMsg.Find(ctx, query, options.Find().SetLimit(limit).SetSkip(limit*offset).SetSort(sort)) if err != nil { return nil, err } if err := cur.All(ctx, &result); err != nil { return nil, err } return result, nil } func (d *DAL) SetTicketStatus(ctx context.Context, ticket, status string) error { if _, err := d.colTck.UpdateByID(ctx, ticket, bson.M{ "$set": bson.M{ "State": status, "UpdatedAt": time.Now(), }, }); err != nil { return err } return nil } func (d *DAL) SetRate(ctx context.Context, ticket string, rate int) error { if _, err := d.colTck.UpdateByID(ctx, ticket, bson.M{"$set": bson.M{ "Rate": rate, "UpdatedAt": time.Now(), }}); err != nil { return err } return nil } func (d *DAL) SetAnswerer(ctx context.Context, ticket, answerer string) error { if _, err := d.colTck.UpdateByID(ctx, ticket, bson.M{"$set": bson.M{ "AnswererID": answerer, "UpdatedAt": time.Now(), }}); err != nil { return err } return nil } type Additional struct { Email string Uid int64 } type Identites struct { ID string `bson:"_id"` Identites []Identity `bson:"Identities"` } type Identity struct { Name string `bson:"Name"` Identity string `bson:"Identity"` } type UidTechs struct { DisplayID int64 `bson:"display_id"` ID string `bson:"_id"` } func (d *DAL) GetAdditionalData(ctx context.Context, id string) (*Additional, error) { result := Additional{} idens := Identites{} if err := d.client.Database("bb-identity").Collection("tp-users").FindOne(ctx, bson.M{ "_id": id, }, options.FindOne().SetProjection(bson.D{{ "Identities", 1, }})).Decode(&idens); err != nil { return nil, err } for _, iden := range idens.Identites { fmt.Println(iden.Name, iden) if iden.Name == "email" { result.Email = iden.Identity } } u := UidTechs{} if err := d.client.Database("profile").Collection("profile").FindOne(ctx, bson.M{ "_id": id, }, options.FindOne().SetProjection(bson.D{{ "display_id", 1, }})).Decode(&u); err != nil { return nil, err } result.Uid = u.DisplayID return &result, nil } func (d *DAL) InsertAccount(ctx context.Context, record *model.Account) (*model.Account, error) { now := time.Now() record.CreatedAt, record.UpdatedAt = now, now record.ID = xid.New().String() if record.Avatar == "" { record.Avatar = "/media/avatar/default-avatar.jpg" } if record.Nickname == "" { record.Nickname = "Unknown" } if record.Role == "" { record.Role = "user" } _, err := d.colAcc.InsertOne(ctx, record) if err != nil { d.logger.Emit(ErrorInsertAccount{err, record}) return nil, err } return record, nil } type ErrorInsertAccount struct { Err error Account *model.Account } func (d *DAL) GetAccount(ctx context.Context, id string) (*model.Account, error) { if id == "" { err := errors.New("id cannot be empty") d.logger.Emit(ErrorGetAccount{err, id}) return nil, err } var result model.Account err := d.colAcc.FindOne(ctx, bson.M{"_id": id}).Decode(&result) if err != nil { d.logger.Emit(ErrorGetAccount{err, id}) return nil, err } return &result, nil } type ErrorGetAccount struct { Err error ID string } func (d *DAL) GetAccountByUserID(ctx context.Context, userId string) (*model.Account, error) { if userId == "" { err := errors.New("userId cannot be empty") d.logger.Emit(ErrorGetAccount{err, userId}) return nil, err } var result model.Account err := d.colAcc.FindOne(ctx, bson.M{"userId": userId}).Decode(&result) if err != nil { d.logger.Emit(ErrorGetAccount{err, userId}) return nil, err } return &result, nil } func (d *DAL) GetAccountPage(ctx context.Context, search string, offset, limit int64) (*model.AccountPage, error) { var query bson.M if search != "" { query = bson.M{ "$text": bson.M{ "$search": search, }, } } count, err := d.colAcc.CountDocuments(ctx, query) sort := bson.D{{"CreatedAt", -1}} cur, err := d.colAcc.Find(ctx, query, options.Find().SetLimit(limit).SetSkip(limit*offset).SetSort(sort)) if err != nil { return nil, err } var items []model.Account if err := cur.All(ctx, &items); err != nil { return nil, err } return &model.AccountPage{Count: count, Items: items}, nil } func (d *DAL) SetAccountRole(ctx context.Context, userId, role string) (*model.Account, error) { if userId == "" { err := errors.New("userId cannot be empty") d.logger.Emit(ErrorSetAccount{err, userId}) return nil, err } if role == "" { err := errors.New("role cannot be empty") d.logger.Emit(ErrorSetAccount{err, userId}) return nil, err } filter := bson.M{"userId": userId} update := bson.M{"role": role, "updatedAt": time.Now()} var result model.Account opts := options.FindOneAndUpdate().SetReturnDocument(options.After) err := d.colAcc.FindOneAndUpdate(ctx, filter, bson.M{"$set": update}, opts).Decode(&result) if err != nil { d.logger.Emit(ErrorSetAccount{err, userId}) return nil, err } return &result, nil } func (d *DAL) SetAccountNickname(ctx context.Context, userId, nickname string) (*model.Account, error) { if userId == "" { err := errors.New("userId cannot be empty") d.logger.Emit(ErrorSetAccount{err, userId}) return nil, err } if nickname == "" { err := errors.New("nickname cannot be empty") d.logger.Emit(ErrorSetAccount{err, userId}) return nil, err } filter := bson.M{"userId": userId} update := bson.M{"nickname": nickname, "updatedAt": time.Now()} var result model.Account opts := options.FindOneAndUpdate().SetReturnDocument(options.After) err := d.colAcc.FindOneAndUpdate(ctx, filter, bson.M{"$set": update}, opts).Decode(&result) if err != nil { d.logger.Emit(ErrorSetAccount{err, userId}) return nil, err } return &result, nil } func (d *DAL) SetAccountAvatar(ctx context.Context, userId, avatar string) (*model.Account, error) { if userId == "" { err := errors.New("userId cannot be empty") d.logger.Emit(ErrorSetAccount{err, userId}) return nil, err } if avatar == "" { err := errors.New("avatar cannot be empty") d.logger.Emit(ErrorSetAccount{err, userId}) return nil, err } filter := bson.M{"userId": userId} update := bson.M{"avatar": avatar, "updatedAt": time.Now()} var result model.Account opts := options.FindOneAndUpdate().SetReturnDocument(options.After) err := d.colAcc.FindOneAndUpdate(ctx, filter, bson.M{"$set": update}, opts).Decode(&result) if err != nil { d.logger.Emit(ErrorSetAccount{err, userId}) return nil, err } return &result, nil } func (d *DAL) SetAccountDelete(ctx context.Context, userId string, isDeleted bool) (*model.Account, error) { if userId == "" { err := errors.New("userId cannot be empty") d.logger.Emit(ErrorSetAccount{err, userId}) return nil, err } filter := bson.M{"userId": userId} update := bson.M{"isDeleted": isDeleted, "updatedAt": time.Now()} var result model.Account opts := options.FindOneAndUpdate().SetReturnDocument(options.After) err := d.colAcc.FindOneAndUpdate(ctx, filter, bson.M{"$set": update}, opts).Decode(&result) if err != nil { d.logger.Emit(ErrorSetAccount{err, userId}) return nil, err } return &result, nil } type ErrorSetAccount struct { Err error UserID string } func (d *DAL) DeleteAccount(ctx context.Context, userId string) (*model.Account, error) { if userId == "" { err := errors.New("userId cannot be empty") d.logger.Emit(ErrorDeleteAccount{err, userId}) return nil, err } filter := bson.M{"userId": userId} var acc model.Account err := d.colAcc.FindOne(ctx, filter).Decode(&acc) if err != nil { d.logger.Emit(ErrorDeleteAccount{err, userId}) return nil, err } _, err = d.colAcc.DeleteOne(ctx, filter) if err != nil { d.logger.Emit(ErrorDeleteAccount{err, userId}) return nil, err } return &acc, err } type ErrorDeleteAccount struct { Err error UserID string }