package dal import ( "bitbucket.org/skeris/heruvym/model" "context" "github.com/rs/xid" "github.com/themakers/hlog" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readpref" "time" ) const ( collMessages = "messages" collTickets = "tickets" ) type DAL struct { logger hlog.Logger colMsg, colTck *mongo.Collection client *mongo.Client } type ErrorConnectToDB struct { Err error MongoURI string } type ErrorPingDB struct { Err error MongoURI string } type InfoPing struct { MongoURI string Nanoseconds int64 } func New( ctx context.Context, mongoURI, database string, log hlog.Logger, ) (*DAL, error) { client, err := mongo.Connect(ctx, options.Client().ApplyURI(mongoURI)) if err != nil { log.Emit(ErrorConnectToDB{ Err: err, MongoURI: mongoURI, }) return nil, err } before := time.Now().Unix() if err := client.Ping(ctx, readpref.PrimaryPreferred()); err != nil { log.Emit(ErrorPingDB{ Err: err, MongoURI: mongoURI, }) return nil, err } log.Emit(InfoPing{ MongoURI: mongoURI, Nanoseconds: time.Now().Unix() - before, }) return &DAL{ client: client, colMsg: client.Database(database).Collection(collMessages), colTck: client.Database(database).Collection(collTickets), logger: log.Module("DAL"), }, nil } type ErrorInsert struct { Err error UserID, SessionID string } func (d *DAL) PutMessage( ctx context.Context, message, userID, sessionID, ticketID string, files []string, ) (*model.Message, error) { insertable := model.Message{ ID: xid.New().String(), UserID: userID, SessionID: sessionID, TicketID: ticketID, Message: message, Files: files, Shown: map[string]int{}, CreatedAt: time.Now(), } if _, err := d.colMsg.InsertOne(ctx, &insertable); err != nil { d.logger.Emit(ErrorInsert{ Err: err, UserID: userID, SessionID: sessionID, }) return nil, err } return &insertable, nil } func (d *DAL) CreateTicket( ctx context.Context, userID, sessionID, title, message string, files []string, ) (string, error) { ticketID := xid.New().String() if _, err := d.colTck.InsertOne(ctx, &model.Ticket{ ID: ticketID, UserID: userID, SessionID: sessionID, Title: title, State: model.StateOpen, CreatedAt: time.Now(), Rate: -1, TopMessage: model.Message{ ID: xid.New().String(), UserID: userID, SessionID: sessionID, TicketID: ticketID, Message: message, Files: files, CreatedAt: time.Now(), }, }); err != nil { d.logger.Emit(ErrorInsert{ Err: err, UserID: userID, SessionID: sessionID, }) return "", err } return ticketID, nil } func (d *DAL) GetTickets4Sess(ctx context.Context, sessID string) ([]model.Ticket, error) { cur, err := d.colTck.Find(ctx, bson.M{ "SessionID": sessID, }) if err != nil { return nil, err } var result []model.Ticket if err := cur.All(ctx, &result); err != nil { return nil, err } return result, nil } func (d *DAL) GetTicket4Sess( ctx context.Context, ticketID, sessID string) (*model.Ticket, error) { var result model.Ticket if err := d.colTck.FindOne(ctx, bson.M{ "_id": ticketID, "SessionID": sessID, }).Decode(&result); err != nil { return nil, err } return &result, nil } func (d *DAL) GetTicket4User( ctx context.Context, ticketID, userID string) (*model.Ticket, error) { var result model.Ticket if err := d.colTck.FindOne(ctx, bson.M{ "_id": ticketID, "UserID": userID, }).Decode(&result); err != nil { return nil, err } return &result, nil } func (d *DAL) WatchTickets( ctx context.Context, userID string, yield func(ticket model.Ticket) error) error { operationTypes := []bson.D{{{"operationType", "insert"}}, {{"operationType", "update"}}} matchStage := bson.D{ { "$and", bson.D{ {"$or", operationTypes}, {"fullDocument.UserID", userID}, }}, } matchPipeline := bson.D{ {"$match", matchStage}, } cs, err := d.colTck.Watch(ctx, matchPipeline, options.ChangeStream().SetFullDocument(options.Default)) if err != nil { return err } var piece model.Ticket for cs.Next(ctx) { if err := cs.Decode(&piece); err != nil { return err } if err := yield(piece); err != nil { return err } } return nil } func (d *DAL) YieldUserTickets( ctx context.Context, userID string, yield func(ticket model.Ticket) error) error { cursor, err := d.colTck.Find(ctx, bson.M{ "UserID": userID, }) if err != nil { return err } var piece model.Ticket for cursor.Next(ctx) { if err := cursor.Decode(&piece); err != nil { return err } if err := yield(piece); err != nil { return err } } return nil } func (d *DAL) WatchActiveTickets(ctx context.Context, yield func(ticket model.Ticket) error) error { operationTypes := []bson.D{{{"operationType", "insert"}}, {{"operationType", "update"}}} matchStage := bson.D{{"$and", []bson.D{ { {"$or", operationTypes}, }, { { "fullDocument.State", bson.D{{ "$ne", model.StateClose, }}, }, }, }}} matchPipeline := bson.D{ {"$match", matchStage}, } cs, err := d.colTck.Watch(ctx, matchPipeline, options.ChangeStream().SetFullDocument(options.Default)) if err != nil { return err } var piece model.Ticket for cs.Next(ctx) { if err := cs.Decode(&piece); err != nil { return err } if err := yield(piece); err != nil { return err } } return nil } func (d *DAL) UpdateTopMessage(ctx context.Context, ticketID string, msg *model.Message) error { if err := d.colTck.FindOneAndUpdate(ctx, bson.M{ "_id": ticketID, }, bson.M{ "$set": bson.M{ "UpdatedAt": time.Now(), "TopMessage": msg, }, }).Decode(&model.Ticket{}); err != nil { return err } return nil } func (d *DAL) YieldMessages( ctx context.Context, ticketID string, yield func(ticket model.Message) error) error { cursor, err := d.colTck.Find(ctx, bson.M{ "TicketID": ticketID, }) if err != nil { return err } var piece model.Message for cursor.Next(ctx) { if err := cursor.Decode(&piece); err != nil { return err } if err := yield(piece); err != nil { return err } } return nil } func (d *DAL) WatchMessages( ctx context.Context, ticketID string, yield func(ticket model.Message) error) error { operationTypes := []bson.D{{{"operationType", "insert"}}, {{"operationType", "update"}}} matchStage := bson.D{{"$and", []bson.D{ { {"$or", operationTypes}, }, { { "fullDocument.TicketID", ticketID, }, }, }}} matchPipeline := bson.D{ {"$match", matchStage}, } cs, err := d.colMsg.Watch(ctx, matchPipeline, options.ChangeStream().SetFullDocument(options.Default)) if err != nil { return err } var piece model.Message for cs.Next(ctx) { if err := cs.Decode(&piece); err != nil { return err } if err := yield(piece); err != nil { return err } } return nil } func (d *DAL) SetShown(ctx context.Context, messageID, userID string) error { if err := d.colMsg.FindOneAndUpdate(ctx, bson.M{ "_id": messageID, }, bson.M{ "$set": bson.M{ userID: 1, }, }).Decode(&model.Message{}); err != nil { return err } return nil } func (d *DAL) GetTicketPage( ctx context.Context, state, srch string, limit, skip int64, ) (*[]model.Ticket, error) { query := bson.M{} if state != "" { query["State"] = state } if srch != "" { query["$test"] = bson.M{ "$search": srch, } } cur, err := d.colTck.Find(ctx, query, options.Find().SetLimit(limit).SetSkip(skip*limit)) if err != nil { return nil, err } var result []model.Ticket if err := cur.All(ctx, &result); err != nil { return nil, err } return &result, nil } func (d *DAL) GetMessagesPage(ctx context.Context, search, ticketID string, limit, offset int64) ([]model.Message, error) { var ( query bson.M result []model.Message ) if search != "" { query = bson.M{ "TicketID": ticketID, "$text": bson.M{ "$search": search, }, } } else { query = bson.M{ "TicketID": ticketID, } } cur, err := d.colMsg.Find(ctx, query, options.Find().SetLimit(limit).SetSkip(limit*offset)) if err != nil { return nil, err } if err := cur.All(ctx, &result); err != nil { return nil, err } return nil, nil } func (d *DAL) SetTicketStatus(ctx context.Context, ticket, status string) error { if _, err := d.colTck.UpdateByID(ctx, ticket, bson.M{ "State": status, "UpdatedAt": time.Now(), }); err != nil { return err } return nil } func (d *DAL) SetRate(ctx context.Context, ticket string, rate int) error { if _, err := d.colTck.UpdateByID(ctx, ticket, bson.M{"$set": bson.M{ "Rate": rate, "UpdatedAt": time.Now(), }}); err != nil { return err } return nil } func (d *DAL) SetAnswerer(ctx context.Context, ticket, answerer string) error { if _, err := d.colTck.UpdateByID(ctx, ticket, bson.M{"$set": bson.M{ "AnswererID": answerer, "UpdatedAt": time.Now(), }}); err != nil { return err } return nil }