364 lines
8.6 KiB
Go
364 lines
8.6 KiB
Go
package dal
|
||
|
||
import (
|
||
"bytes"
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"go.mongodb.org/mongo-driver/bson"
|
||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||
"go.mongodb.org/mongo-driver/mongo"
|
||
"go.mongodb.org/mongo-driver/mongo/options"
|
||
"net/http"
|
||
"time"
|
||
)
|
||
|
||
const PaymentTimeout = 15 * time.Minute
|
||
|
||
type Payment struct {
|
||
ID string `json:"id" bson:"_id"`
|
||
RequesterID string `json:"requester_id" bson:"requester_id"`
|
||
UserIP string `json:"user_ip" bson:"user_ip"`
|
||
Email string `json:"email" bson:"email"`
|
||
Phone string `json:"phone" bson:"phone"`
|
||
Description string `json:"description" bson:"description"`
|
||
IsRefill bool `json:"is_refill" bson:"is_refill"` // Флаг описывает ввод/вывод средств
|
||
Status string `json:"status" bson:"status"` // Значения: open, wait, pending, in_progress, timeout,
|
||
// accepted, declined
|
||
PayWayID string `json:"payway_id" bson:"payway_id"` //
|
||
PaymentType string `json:"payment_type" bson:"payment_type"` //
|
||
ServiceID string `json:"service_id" bson:"service_id"` // ID платежа в платежном сервисе
|
||
Destination string `json:"destination" bson:"destination"` // Назначение платежа. Only for payout/withdrawal
|
||
Amount float64 `json:"amount" bson:"amount"`
|
||
Currency string `json:"currency" bson:"currency"`
|
||
IsFake bool `json:"is_fake" bson:"is_fake"`
|
||
OnAccept Action `json:"on_accept" bson:"on_accept"`
|
||
OnDecline Action `json:"on_decline" bson:"on_decline"`
|
||
OnTimeout Action `json:"on_timeout" bson:"on_timeout"`
|
||
PendedAt time.Time `json:"pended_at" bson:"pended_at"`
|
||
CreatedAt time.Time `json:"created_at" bson:"created_at"`
|
||
UpdatedAt time.Time `json:"updated_at" bson:"updated_at"`
|
||
}
|
||
|
||
type Action struct {
|
||
ActionType string `json:"action_type" bson:"action_type"` // Значения: mail, vk, tg, sms, request
|
||
Target string `json:"target" bson:"target"`
|
||
Data string `json:"data" bson:"data"`
|
||
}
|
||
|
||
func (mc *MongoConnection) InsertPayment(ctx context.Context, record *Payment) (string, error) {
|
||
err := record.Prepare()
|
||
|
||
if err != nil {
|
||
mc.hl.Emit(ErrorInsertPayment{err})
|
||
return "", nil
|
||
}
|
||
|
||
result, err := mc.coll["payment"].InsertOne(ctx, record)
|
||
|
||
if err != nil {
|
||
mc.hl.Emit(ErrorInsertPayment{err})
|
||
return "", err
|
||
}
|
||
|
||
mc.hl.Emit(InfoInsertPayment{result})
|
||
|
||
return result.InsertedID.(string), nil
|
||
}
|
||
|
||
func (mc *MongoConnection) GetPayment(ctx context.Context, id string) (*Payment, error) {
|
||
filter := bson.M{"_id": id}
|
||
|
||
var result Payment
|
||
|
||
err := mc.coll["payment"].FindOne(ctx, filter).Decode(&result)
|
||
|
||
if err == mongo.ErrNoDocuments {
|
||
return nil, nil
|
||
} else {
|
||
if err != nil {
|
||
mc.hl.Emit(ErrorGetPayment{err})
|
||
return nil, err
|
||
}
|
||
}
|
||
|
||
mc.hl.Emit(InfoGetPayment{id})
|
||
|
||
return &result, err
|
||
}
|
||
|
||
func (mc *MongoConnection) GetPaymentByServiceID(ctx context.Context, serviceId, isRefill string) (*Payment,
|
||
error) {
|
||
filter := bson.M{"api_id": serviceId}
|
||
|
||
switch isRefill {
|
||
case "yes", "true":
|
||
filter["is_refill"] = true
|
||
case "no", "false":
|
||
filter["is_refill"] = false
|
||
}
|
||
|
||
var result Payment
|
||
|
||
err := mc.coll["payment"].FindOne(ctx, filter).Decode(&result)
|
||
|
||
if err == mongo.ErrNoDocuments {
|
||
return nil, nil
|
||
} else {
|
||
if err != nil {
|
||
mc.hl.Emit(ErrorGetPayment{err})
|
||
}
|
||
}
|
||
|
||
mc.hl.Emit(InfoGetPayment{fmt.Sprintf("by service_id %v", serviceId)})
|
||
|
||
return &result, err
|
||
}
|
||
|
||
func (mc *MongoConnection) GetPaymentListByStatus(ctx context.Context, status, isRefill string) ([]Payment,
|
||
error) {
|
||
err := CheckPaymentStatus(status)
|
||
|
||
if err != nil {
|
||
mc.hl.Emit(ErrorGetPayment{err})
|
||
return nil, nil
|
||
}
|
||
|
||
filter := bson.M{"status": status}
|
||
|
||
switch isRefill {
|
||
case "yes", "true":
|
||
filter["is_refill"] = true
|
||
case "no", "false":
|
||
filter["is_refill"] = false
|
||
}
|
||
|
||
cursor, err := mc.coll["payment"].Find(ctx, filter)
|
||
|
||
if err != nil {
|
||
mc.hl.Emit(ErrorGetPayment{err})
|
||
return nil, nil
|
||
}
|
||
|
||
var results []Payment
|
||
err = cursor.All(ctx, &results)
|
||
|
||
if err != nil {
|
||
mc.hl.Emit(ErrorGetPayment{err})
|
||
return nil, nil
|
||
}
|
||
|
||
mc.hl.Emit(InfoGetPayment{fmt.Sprintf("by status %v", status)})
|
||
|
||
return results, nil
|
||
}
|
||
|
||
func (mc *MongoConnection) UpdatePaymentStatus(ctx context.Context, id, status string) error {
|
||
err := CheckPaymentStatus(status)
|
||
|
||
if err != nil {
|
||
mc.hl.Emit(ErrorUpdatePayment{err})
|
||
return err
|
||
}
|
||
|
||
update := bson.M{
|
||
"updated_at": time.Now(),
|
||
"status": status,
|
||
}
|
||
|
||
result, err := mc.coll["payment"].UpdateByID(ctx, id, bson.D{{"$set", update}})
|
||
if err != nil {
|
||
mc.hl.Emit(ErrorUpdatePayment{err})
|
||
return err
|
||
}
|
||
|
||
mc.hl.Emit(InfoUpdatePayment{result})
|
||
|
||
return nil
|
||
}
|
||
|
||
func (mc *MongoConnection) UpdatePaymentServiceID(ctx context.Context, id, serviceId string) error {
|
||
update := bson.M{
|
||
"updated_at": time.Now(),
|
||
"service_id": serviceId,
|
||
}
|
||
|
||
result, err := mc.coll["payment"].UpdateByID(ctx, id, bson.D{{"$set", update}})
|
||
if err != nil {
|
||
mc.hl.Emit(ErrorUpdatePayment{err})
|
||
return err
|
||
}
|
||
|
||
mc.hl.Emit(InfoUpdatePayment{result})
|
||
return nil
|
||
}
|
||
|
||
func (mc *MongoConnection) ListenPayment(ctx context.Context) {
|
||
operationTypes := []bson.D{{{"operationType", "update"}}}
|
||
|
||
matchStage := bson.D{
|
||
{"$match", bson.D{{"$or", operationTypes}}},
|
||
}
|
||
|
||
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
|
||
|
||
changeStream, err := mc.coll["payment"].Watch(ctx,
|
||
mongo.Pipeline{matchStage}, opts)
|
||
|
||
if err != nil {
|
||
mc.hl.Emit(ErrorListenPayment{err})
|
||
return
|
||
}
|
||
|
||
// routine: SetTimeout - каждые 3 минуты проверяет БД на наличие записей с истекшим PaymentTimeout
|
||
go func() {
|
||
for {
|
||
paymentList, err := mc.GetPaymentListByStatus(ctx, "open", "")
|
||
|
||
if err != nil {
|
||
mc.hl.Emit(ErrorListenPayment{err})
|
||
} else {
|
||
for _, payment := range paymentList {
|
||
if time.Since(payment.CreatedAt) >= PaymentTimeout {
|
||
err = mc.UpdatePaymentStatus(ctx, payment.ID, "timeout")
|
||
}
|
||
|
||
if err != nil {
|
||
mc.hl.Emit(ErrorListenPayment{err})
|
||
}
|
||
}
|
||
}
|
||
|
||
// Прерывание
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-time.After(3 * time.Minute): // Сладкий сон
|
||
continue
|
||
}
|
||
}
|
||
}()
|
||
|
||
// routine: UpdateListener
|
||
go func() {
|
||
// Перехват паники (см. ниже)
|
||
defer func() {
|
||
if err := recover(); err != nil {
|
||
mc.hl.Emit(ErrorListenPayment{err.(error)})
|
||
}
|
||
}()
|
||
|
||
for changeStream.Next(ctx) {
|
||
// При закрытии приложения происходит паника (change_stream.go 561). context не важен...'
|
||
current := changeStream.Current
|
||
|
||
if err != nil {
|
||
mc.hl.Emit(ErrorListenPayment{err})
|
||
return
|
||
}
|
||
|
||
var payment Payment
|
||
|
||
err = bson.Unmarshal(current.Lookup("fullDocument").Value, &payment)
|
||
if err != nil {
|
||
mc.hl.Emit(ErrorListenPayment{err})
|
||
return
|
||
}
|
||
var action Action
|
||
switch payment.Status {
|
||
case "accepted":
|
||
action = payment.OnAccept
|
||
case "timeout":
|
||
action = payment.OnTimeout
|
||
case "declined":
|
||
action = payment.OnDecline
|
||
}
|
||
|
||
// Some work with action
|
||
if payment.Status == "accepted" && payment.IsRefill {
|
||
switch action.ActionType {
|
||
case "request":
|
||
fmt.Println("CYCLE", payment )
|
||
buf := bytes.Buffer{}
|
||
buf.Write([]byte(action.Data))
|
||
if _, err := http.Post("https://fynrods.ru/bet/increase", "application/json", &buf); err != nil {
|
||
fmt.Println("ERRINC", err )
|
||
}
|
||
}
|
||
}
|
||
|
||
mc.hl.Emit(InfoListenPayment{fmt.Sprintf("%v %v done", payment.ID, payment.Status)})
|
||
}
|
||
if err = changeStream.Err(); err != nil {
|
||
mc.hl.Emit(ErrorListenPayment{err})
|
||
}
|
||
}()
|
||
|
||
select {
|
||
case <-ctx.Done():
|
||
err = changeStream.Close(context.TODO())
|
||
if err != nil {
|
||
mc.hl.Emit(ErrorListenPayment{err})
|
||
}
|
||
return
|
||
}
|
||
}
|
||
|
||
func (p *Payment) Prepare() error {
|
||
err := CheckPaymentStatus(p.Status)
|
||
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
err = CheckActionType(p.OnAccept.ActionType)
|
||
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
err = CheckActionType(p.OnDecline.ActionType)
|
||
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
err = CheckActionType(p.OnTimeout.ActionType)
|
||
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
now := time.Now()
|
||
|
||
if p.ID == "" {
|
||
p.ID = primitive.NewObjectIDFromTimestamp(now).Hex()
|
||
}
|
||
|
||
p.CreatedAt = now
|
||
p.UpdatedAt = now
|
||
|
||
return nil
|
||
}
|
||
|
||
func CheckPaymentStatus(status string) error {
|
||
allowedStatus := map[string]struct{}{"open": {}, "wait": {}, "pending": {}, "timeout": {}, "accepted": {},
|
||
"declined": {}, "in_progress": {}}
|
||
|
||
if _, ok := allowedStatus[status]; !ok {
|
||
return errors.New("got bad status: " + status)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func CheckActionType(aType string) error {
|
||
allowedType := map[string]struct{}{"mail": {}, "vk": {}, "tg": {}, "sms": {}, "request": {}}
|
||
|
||
if _, ok := allowedType[aType]; !ok {
|
||
return errors.New("got bad type: " + aType)
|
||
}
|
||
|
||
return nil
|
||
}
|