-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathinterceptor.go
160 lines (145 loc) · 4.55 KB
/
interceptor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package emongo
import (
"encoding/json"
"errors"
"fmt"
"log"
"runtime"
"strconv"
"strings"
"time"
"github.com/gotomicro/ego/core/eapp"
"github.com/gotomicro/ego/core/elog"
"github.com/gotomicro/ego/core/emetric"
"github.com/gotomicro/ego/core/util/xdebug"
"go.mongodb.org/mongo-driver/mongo"
)
const (
metricType = "mongo"
)
type Interceptor func(oldProcessFn processFn) (newProcessFn processFn)
func InterceptorChain(interceptors ...Interceptor) func(oldProcess processFn) processFn {
build := func(interceptor Interceptor, oldProcess processFn) processFn {
return interceptor(oldProcess)
}
return func(oldProcess processFn) processFn {
chain := oldProcess
for i := len(interceptors) - 1; i >= 0; i-- {
chain = build(interceptors[i], chain)
}
return chain
}
}
func debugInterceptor(compName string, c *config) func(processFn) processFn {
return func(oldProcess processFn) processFn {
return func(cmd *cmd) error {
if !eapp.IsDevelopmentMode() {
return oldProcess(cmd)
}
beg := time.Now()
err := oldProcess(cmd)
cost := time.Since(beg)
if err != nil {
log.Println("emongo.response", xdebug.MakeReqAndResError(fileWithLineNum(), compName,
fmt.Sprintf("%v", c.keyName), cost, fmt.Sprintf("%s %v", cmd.name, mustJsonMarshal(cmd.req)), err.Error()),
)
} else {
log.Println("emongo.response", xdebug.MakeReqAndResInfo(fileWithLineNum(), compName,
fmt.Sprintf("%v", c.keyName), cost, fmt.Sprintf("%s %v", cmd.name, mustJsonMarshal(cmd.req)), fmt.Sprintf("%v", cmd.res)),
)
}
return err
}
}
}
func metricInterceptor(compName string, c *config, logger *elog.Component) func(processFn) processFn {
return func(oldProcess processFn) processFn {
return func(cmd *cmd) error {
beg := time.Now()
err := oldProcess(cmd)
cost := time.Since(beg)
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
emetric.ClientHandleCounter.Inc(metricType, compName, cmd.name, c.keyName, "Empty")
} else {
emetric.ClientHandleCounter.Inc(metricType, compName, cmd.name, c.keyName, "Error")
}
} else {
emetric.ClientHandleCounter.Inc(metricType, compName, cmd.name, c.keyName, "OK")
}
emetric.ClientHandleHistogram.WithLabelValues(metricType, compName, cmd.name, c.keyName).Observe(cost.Seconds())
return err
}
}
}
func accessInterceptor(compName string, c *config, logger *elog.Component) func(processFn) processFn {
return func(oldProcess processFn) processFn {
return func(cmd *cmd) error {
beg := time.Now()
err := oldProcess(cmd)
cost := time.Since(beg)
var fields = make([]elog.Field, 0, 15)
fields = append(fields,
elog.FieldMethod(cmd.name),
elog.FieldCost(cost),
elog.FieldKey(cmd.dbName),
elog.String("collName", cmd.collName),
elog.String("cmdName", cmd.name),
)
if c.EnableAccessInterceptorReq {
fields = append(fields, elog.Any("req", cmd.req))
}
if c.EnableAccessInterceptorRes && err == nil {
fields = append(fields, elog.Any("res", cmd.res))
}
event := "normal"
isSlowLog := false
if c.SlowLogThreshold > time.Duration(0) && cost > c.SlowLogThreshold {
event = "slow"
isSlowLog = true
}
if err != nil {
fields = append(fields, elog.FieldEvent(event), elog.FieldErr(err))
if errors.Is(err, mongo.ErrNoDocuments) {
// 这种日志可能很多,也没必要,只有开启的时候,或者慢日志的时候记录
if c.EnableAccessInterceptor || isSlowLog {
logger.Warn("access", fields...)
}
return err
}
// 如果用户没开启req,那么错误必记录Req
if !c.EnableAccessInterceptorReq {
fields = append(fields, elog.Any("req", cmd.req))
}
logger.Error("access", fields...)
return err
}
if c.EnableAccessInterceptor || isSlowLog {
fields = append(fields, elog.FieldEvent(event))
if isSlowLog {
logger.Warn("access", fields...)
} else {
logger.Info("access", fields...)
}
}
return nil
}
}
}
func mustJsonMarshal(val interface{}) string {
res, _ := json.Marshal(val)
return string(res)
}
func fileWithLineNum() string {
// the second caller usually from internal, so set i start from 2
for i := 2; i < 15; i++ {
_, file, line, ok := runtime.Caller(i)
if !ok {
break
}
if (!(strings.Contains(file, "ego-component/emongo") && strings.HasSuffix(file, "wrapped_client.go")) && !(strings.Contains(file, "ego-component/emongo") && strings.Contains(file, "wrapped_collection.go"))) || strings.HasSuffix(file, "_test.go") {
return file + ":" + strconv.FormatInt(int64(line), 10)
}
}
return ""
}