This commit is contained in:
skeris 2024-08-28 15:59:45 +03:00
parent 0a55a10baa
commit 0401a4829d
14 changed files with 99 additions and 99 deletions

4
go.mod

@ -20,7 +20,8 @@ require (
google.golang.org/grpc v1.60.1 google.golang.org/grpc v1.60.1
google.golang.org/protobuf v1.32.0 google.golang.org/protobuf v1.32.0
penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240607202348-efe5f2bf3e8c penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240607202348-efe5f2bf3e8c
penahub.gitlab.yandexcloud.net/external/trashlog.git v0.1.2-0.20240523172059-9bbe8a9faa31 penahub.gitlab.yandexcloud.net/devops/linters/golang.git v0.0.0-20240803124813-79e62d2acf3c
penahub.gitlab.yandexcloud.net/external/trashlog v0.1.5
) )
require ( require (
@ -63,5 +64,4 @@ require (
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/tucnak/telebot.v2 v2.5.0 // indirect gopkg.in/tucnak/telebot.v2 v2.5.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
penahub.gitlab.yandexcloud.net/devops/linters/golang.git v0.0.0-20240725131128-102f5d56f156 // indirect
) )

8
go.sum

@ -293,7 +293,7 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240607202348-efe5f2bf3e8c h1:CWb4UcuNXhd1KTNOmy2U0TJO4+Qxgxrj5cwkyFqbgrk= penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240607202348-efe5f2bf3e8c h1:CWb4UcuNXhd1KTNOmy2U0TJO4+Qxgxrj5cwkyFqbgrk=
penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240607202348-efe5f2bf3e8c/go.mod h1:+bPxq2wfW5S1gd+83vZYmHm33AE7nEBfznWS8AM1TKE= penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240607202348-efe5f2bf3e8c/go.mod h1:+bPxq2wfW5S1gd+83vZYmHm33AE7nEBfznWS8AM1TKE=
penahub.gitlab.yandexcloud.net/devops/linters/golang.git v0.0.0-20240725131128-102f5d56f156 h1:IpItA0b6uvawjNk3KzI5X92Z7Iwn1FX7s2TfcJ8tRus= penahub.gitlab.yandexcloud.net/devops/linters/golang.git v0.0.0-20240803124813-79e62d2acf3c h1:imtXaIVscs8it6SfAmDxjNxqQSF44GgCTl1N6JT6unA=
penahub.gitlab.yandexcloud.net/devops/linters/golang.git v0.0.0-20240725131128-102f5d56f156/go.mod h1:LIcPclsOvgAKGuuMYXHDXVMx99WoGWHdmcc4Bgoftyk= penahub.gitlab.yandexcloud.net/devops/linters/golang.git v0.0.0-20240803124813-79e62d2acf3c/go.mod h1:i7M72RIpkSjcQtHID6KKj9RT/EYZ1rxS6tIPKWa/BSY=
penahub.gitlab.yandexcloud.net/external/trashlog.git v0.1.2-0.20240523172059-9bbe8a9faa31 h1:WlRVJnzU0sti+qBq/JTCgFPU0RoxIqGHu7hzDirxE2k= penahub.gitlab.yandexcloud.net/external/trashlog v0.1.5 h1:amsK0bkSJxBisk334aFo5ZmVPvN1dBT0Sv5j3V5IsT8=
penahub.gitlab.yandexcloud.net/external/trashlog.git v0.1.2-0.20240523172059-9bbe8a9faa31/go.mod h1:3ml0dAGT8U8RhpevKBfRgG6yKZum8EI2uJxAb2WCIy4= penahub.gitlab.yandexcloud.net/external/trashlog v0.1.5/go.mod h1:J8kQNEP4bL7ZNKHxuT4tfe6a3FHyovpAPkyytN4qllc=

@ -22,8 +22,8 @@ import (
"github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/app" "penahub.gitlab.yandexcloud.net/external/trashlog/app"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/wrappers/zaptrashlog" "penahub.gitlab.yandexcloud.net/external/trashlog/wrappers/zaptrashlog"
"time" "time"
) )

26
vendor/modules.txt vendored

@ -367,17 +367,17 @@ gopkg.in/yaml.v3
## explicit; go 1.21 ## explicit; go 1.21
penahub.gitlab.yandexcloud.net/backend/penahub_common/log_mw penahub.gitlab.yandexcloud.net/backend/penahub_common/log_mw
penahub.gitlab.yandexcloud.net/backend/penahub_common/mongo penahub.gitlab.yandexcloud.net/backend/penahub_common/mongo
# penahub.gitlab.yandexcloud.net/devops/linters/golang.git v0.0.0-20240725131128-102f5d56f156 # penahub.gitlab.yandexcloud.net/devops/linters/golang.git v0.0.0-20240803124813-79e62d2acf3c
## explicit; go 1.22.2 ## explicit; go 1.22.0
penahub.gitlab.yandexcloud.net/devops/linters/golang.git/pkg/dummy penahub.gitlab.yandexcloud.net/devops/linters/golang.git/pkg/dummy
# penahub.gitlab.yandexcloud.net/external/trashlog.git v0.1.2-0.20240523172059-9bbe8a9faa31 # penahub.gitlab.yandexcloud.net/external/trashlog v0.1.5
## explicit; go 1.18 ## explicit; go 1.22.0
penahub.gitlab.yandexcloud.net/external/trashlog.git/app penahub.gitlab.yandexcloud.net/external/trashlog/app
penahub.gitlab.yandexcloud.net/external/trashlog.git/dal/bbolt penahub.gitlab.yandexcloud.net/external/trashlog/dal/bbolt
penahub.gitlab.yandexcloud.net/external/trashlog.git/dal/clickhouse penahub.gitlab.yandexcloud.net/external/trashlog/dal/clickhouse
penahub.gitlab.yandexcloud.net/external/trashlog.git/model penahub.gitlab.yandexcloud.net/external/trashlog/model
penahub.gitlab.yandexcloud.net/external/trashlog.git/proto/generated penahub.gitlab.yandexcloud.net/external/trashlog/proto/generated
penahub.gitlab.yandexcloud.net/external/trashlog.git/sink penahub.gitlab.yandexcloud.net/external/trashlog/sink
penahub.gitlab.yandexcloud.net/external/trashlog.git/version penahub.gitlab.yandexcloud.net/external/trashlog/version
penahub.gitlab.yandexcloud.net/external/trashlog.git/wrappers/zaptg penahub.gitlab.yandexcloud.net/external/trashlog/wrappers/zaptg
penahub.gitlab.yandexcloud.net/external/trashlog.git/wrappers/zaptrashlog penahub.gitlab.yandexcloud.net/external/trashlog/wrappers/zaptrashlog

@ -9,11 +9,11 @@ import (
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
"google.golang.org/grpc" "google.golang.org/grpc"
"net" "net"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/dal/clickhouse" "penahub.gitlab.yandexcloud.net/external/trashlog/dal/clickhouse"
trashlogProto "penahub.gitlab.yandexcloud.net/external/trashlog.git/proto/generated" trashlogProto "penahub.gitlab.yandexcloud.net/external/trashlog/proto/generated"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/sink" "penahub.gitlab.yandexcloud.net/external/trashlog/sink"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/version" "penahub.gitlab.yandexcloud.net/external/trashlog/version"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/wrappers/zaptg" "penahub.gitlab.yandexcloud.net/external/trashlog/wrappers/zaptg"
) )
type App struct { type App struct {
@ -139,11 +139,6 @@ func New(ctx context.Context, opts interface{}, ver appInit.Version) (appInit.Co
return nil, err return nil, err
} }
log.With(map[string]string{
"test": "test2",
"test1": "test3",
})
sinkSvc := sink.New(log, logStore) sinkSvc := sink.New(log, logStore)
var grpcOpts []grpc.ServerOption var grpcOpts []grpc.ServerOption

@ -6,7 +6,7 @@ import (
"github.com/rs/xid" "github.com/rs/xid"
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
"os" "os"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/model" "penahub.gitlab.yandexcloud.net/external/trashlog/model"
"time" "time"
) )

@ -6,8 +6,8 @@ import (
"fmt" "fmt"
"github.com/ClickHouse/clickhouse-go" "github.com/ClickHouse/clickhouse-go"
"github.com/themakers/hlog" "github.com/themakers/hlog"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/dal/bbolt" "penahub.gitlab.yandexcloud.net/external/trashlog/dal/bbolt"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/model" "penahub.gitlab.yandexcloud.net/external/trashlog/model"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -177,6 +177,16 @@ type ErrorInsertTransaction struct {
func (d *DAL) batchRecords(ctx context.Context) { func (d *DAL) batchRecords(ctx context.Context) {
defer func() {
val := recover()
if val != nil {
if err, ok := val.(error); ok {
d.logger.Emit(ErrorPanicBatchRecords{Err: err})
} else {
d.logger.Emit(WarnPanicBatch{Message: fmt.Sprint(val)})
}
}
}()
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(time.Second)
for { for {
@ -188,6 +198,7 @@ func (d *DAL) batchRecords(ctx context.Context) {
d.buffer = append(d.buffer, r) d.buffer = append(d.buffer, r)
d.bufM.Unlock() d.bufM.Unlock()
case <-ctx.Done(): case <-ctx.Done():
fmt.Println("INSCTXDone")
ticker.Stop() ticker.Stop()
return return
} }
@ -386,6 +397,7 @@ type ErrorExecClickhouse struct {
type DebugNeedMutex struct{} type DebugNeedMutex struct{}
func (d *DAL) PutRecord(record model.Record) { func (d *DAL) PutRecord(record model.Record) {
fmt.Println("RECORDSTREAMSIZE", len(d.recordsStream))
d.recordsStream <- record d.recordsStream <- record
} }

@ -2,10 +2,11 @@ package sink
import ( import (
"context" "context"
"fmt"
"github.com/themakers/hlog" "github.com/themakers/hlog"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/dal/clickhouse" "penahub.gitlab.yandexcloud.net/external/trashlog/dal/clickhouse"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/model" "penahub.gitlab.yandexcloud.net/external/trashlog/model"
trashlogProto "penahub.gitlab.yandexcloud.net/external/trashlog.git/proto/generated" trashlogProto "penahub.gitlab.yandexcloud.net/external/trashlog/proto/generated"
) )
type Sink struct { type Sink struct {
@ -88,6 +89,7 @@ func (s *Sink) Valve(stream trashlogProto.Trashlog_ValveServer) error {
for { for {
record, err := stream.Recv() record, err := stream.Recv()
if err != nil { if err != nil {
fmt.Println("RECV ERR", err)
return return
} }
dataChan <- record dataChan <- record
@ -100,8 +102,10 @@ func (s *Sink) Valve(stream trashlogProto.Trashlog_ValveServer) error {
if err := stream.SendAndClose(&trashlogProto.Dummy{}); err != nil { if err := stream.SendAndClose(&trashlogProto.Dummy{}); err != nil {
return err return err
} }
fmt.Println("CLOSEVALVE")
return nil return nil
case record := <-dataChan: case record := <-dataChan:
fmt.Println("DATATATATA", record)
s.store.PutRecord(Dto2daoRecord(record)) s.store.PutRecord(Dto2daoRecord(record))
} }
} }
@ -143,7 +147,11 @@ func Dto2daoValue(in *trashlogProto.Value) interface{} {
case *trashlogProto.Value_Num: case *trashlogProto.Value_Num:
return in.GetNum() return in.GetNum()
case *trashlogProto.Value_Flag: case *trashlogProto.Value_Flag:
return in.GetFlag() fmt.Println("Flaag", in.GetFlag())
if in.GetFlag() {
return uint8(1)
}
return uint8(0)
} }
return nil return nil

@ -7,7 +7,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
tb "gopkg.in/tucnak/telebot.v2" tb "gopkg.in/tucnak/telebot.v2"
pb "penahub.gitlab.yandexcloud.net/external/trashlog.git/proto/generated" pb "penahub.gitlab.yandexcloud.net/external/trashlog/proto/generated"
"strings" "strings"
) )

@ -8,10 +8,10 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/backoff" "google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/dal/bbolt" "penahub.gitlab.yandexcloud.net/external/trashlog/dal/bbolt"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/model" "penahub.gitlab.yandexcloud.net/external/trashlog/model"
pb "penahub.gitlab.yandexcloud.net/external/trashlog.git/proto/generated" pb "penahub.gitlab.yandexcloud.net/external/trashlog/proto/generated"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/sink" "penahub.gitlab.yandexcloud.net/external/trashlog/sink"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -80,10 +80,8 @@ func NewCore(
return &tlc, err return &tlc, err
} }
worker := NewWorker(&tlc) go tlc.SendWorker()
go worker.EmitWorker() go tlc.Reconnect()
go worker.SendWorker()
go worker.Reconnect()
return &tlc, nil return &tlc, nil
} }
@ -199,6 +197,8 @@ func (c *TrashLogCore) With(fields []zapcore.Field) zapcore.Core {
recoverStore: c.recoverStore, recoverStore: c.recoverStore,
url: c.url, url: c.url,
sendMutex: c.sendMutex, sendMutex: c.sendMutex,
failCh: c.failCh,
emitCh: c.emitCh,
} }
} }
@ -210,8 +210,11 @@ func convertValue(v interface{}) *pb.Value {
return &pb.Value{Value: &pb.Value_Num{Num: t}} return &pb.Value{Value: &pb.Value_Num{Num: t}}
case float64: case float64:
return &pb.Value{Value: &pb.Value_Double{Double: float32(t)}} return &pb.Value{Value: &pb.Value_Double{Double: float32(t)}}
case bool: case uint8:
return &pb.Value{Value: &pb.Value_Flag{Flag: t}} if t > uint8(0) {
return &pb.Value{Value: &pb.Value_Flag{Flag: true}}
}
return &pb.Value{Value: &pb.Value_Flag{Flag: false}}
} }
return nil return nil
@ -231,10 +234,11 @@ func (c *TrashLogCore) Write(
entry zapcore.Entry, entry zapcore.Entry,
fields []zapcore.Field, fields []zapcore.Field,
) error { ) error {
c.emitCh <- entryWithFields{ fmt.Println("Write()")
entry: entry, c.sendMutex.Lock()
fields: fields, defer c.sendMutex.Unlock()
} c.saveFields(entry, fields)
fmt.Println("Write()end")
return nil return nil
} }
@ -257,77 +261,58 @@ func fieldsToMap(fields []zapcore.Field) map[string]interface{} {
return m return m
} }
type Worker struct { func (c *TrashLogCore) SendWorker() {
core *TrashLogCore
recordCh chan pb.Record
}
func NewWorker(core *TrashLogCore) *Worker {
return &Worker{
core: core,
}
}
func (w *Worker) EmitWorker() {
for {
select {
case <-w.core.ctx.Done():
return
case emit := <-w.core.emitCh:
w.saveFields(emit.entry, emit.fields)
}
}
}
func (w *Worker) SendWorker() {
ticker := time.NewTicker(5 * time.Second) ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
w.fetchBolt() c.fetchBolt()
case <-w.core.ctx.Done(): case <-c.ctx.Done():
return return
} }
} }
} }
func (w *Worker) saveFields(entry zapcore.Entry, fields []zapcore.Field) { func (c *TrashLogCore) saveFields(entry zapcore.Entry, fields []zapcore.Field) {
record := w.prepareRecord(entry, fields) fmt.Println("saveFields()")
err := w.core.recoverStore.PutRecord(sink.Dto2daoRecord(&record)) record := c.prepareRecord(entry, fields)
fmt.Println("saveFields1()")
err := c.recoverStore.PutRecord(sink.Dto2daoRecord(&record))
fmt.Println("saveFields2()")
if err != nil { if err != nil {
fmt.Println("ошибка сохранения записи в болт", err) fmt.Println("ошибка сохранения записи в болт", err)
return return
} }
} }
func (w *Worker) fetchBolt() { func (c *TrashLogCore) fetchBolt() {
records, err := w.core.recoverStore.GetAllRecords() records, err := c.recoverStore.GetAllRecords()
if err != nil { if err != nil {
fmt.Println("ошибка при получении всех записей из болта", err) fmt.Println("ошибка при получении всех записей из болта", err)
return return
} }
for key, record := range records { for key, record := range records {
if w.core.stream != nil { if c.stream != nil {
rec := w.convertToProto(record) rec := c.convertToProto(record)
err = w.core.stream.Send(rec) err = c.stream.Send(rec)
if err == nil { if err == nil {
err = w.core.recoverStore.DeleteRecordByKey([]byte(key)) err = c.recoverStore.DeleteRecordByKey([]byte(key))
if err != nil { if err != nil {
fmt.Println("ошибка при удалении записи из хранилища по ключу", err) fmt.Println("ошибка при удалении записи из хранилища по ключу", err)
continue continue
} }
} else { } else {
fmt.Println("ошибка отправки записи в trashlog", err) fmt.Println("ошибка отправки записи в trashlog", err)
w.core.failCh <- true c.failCh <- true
return return
} }
} }
} }
} }
func (w *Worker) prepareRecord(entry zapcore.Entry, fields []zapcore.Field) pb.Record { func (c *TrashLogCore) prepareRecord(entry zapcore.Entry, fields []zapcore.Field) pb.Record {
fieldMap := fieldsToMap(fields) fieldMap := fieldsToMap(fields)
splittedMessage := strings.Split(entry.Message, "!") splittedMessage := strings.Split(entry.Message, "!")
var msg string var msg string
@ -340,11 +325,11 @@ func (w *Worker) prepareRecord(entry zapcore.Entry, fields []zapcore.Field) pb.R
keyFields, ctxFields := make(map[string]*pb.Value), make(map[string]*pb.Value) keyFields, ctxFields := make(map[string]*pb.Value), make(map[string]*pb.Value)
for k, v := range w.core.keyFields { for k, v := range c.keyFields {
keyFields[k] = v keyFields[k] = v
} }
for k, v := range w.core.ctxFields { for k, v := range c.ctxFields {
ctxFields[k] = v ctxFields[k] = v
} }
@ -368,16 +353,16 @@ func (w *Worker) prepareRecord(entry zapcore.Entry, fields []zapcore.Field) pb.R
KeyFields: keyFields, KeyFields: keyFields,
CtxFields: ctxFields, CtxFields: ctxFields,
SvcFields: &pb.SvcData{ SvcFields: &pb.SvcData{
BuildTime: w.core.svcData.BuildTime, BuildTime: c.svcData.BuildTime,
Version: w.core.svcData.Version, Version: c.svcData.Version,
Commit: w.core.svcData.Commit, Commit: c.svcData.Commit,
File: entry.Caller.File, File: entry.Caller.File,
Line: uint64(entry.Caller.Line), Line: uint64(entry.Caller.Line),
}, },
} }
} }
func (w *Worker) convertToProto(record model.Record) *pb.Record { func (c *TrashLogCore) convertToProto(record model.Record) *pb.Record {
keyFields, ctxFields := make(map[string]*pb.Value), make(map[string]*pb.Value) keyFields, ctxFields := make(map[string]*pb.Value), make(map[string]*pb.Value)
for k, v := range record.KeyFields { for k, v := range record.KeyFields {
keyFields[k] = convertValue(v) keyFields[k] = convertValue(v)
@ -405,12 +390,12 @@ func (w *Worker) convertToProto(record model.Record) *pb.Record {
} }
} }
func (w *Worker) Reconnect() { func (c *TrashLogCore) Reconnect() {
select { select {
case <-w.core.ctx.Done(): case <-c.ctx.Done():
return return
case <-w.core.failCh: case <-c.failCh:
if err := w.core.connectToTrashlog(w.core.url); err == nil { if err := c.connectToTrashlog(c.url); err == nil {
fmt.Println("Reconnect successful") fmt.Println("Reconnect successful")
} }
} }