-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathchannel.go
186 lines (155 loc) · 3.93 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package logf
import (
"os"
"runtime"
"sync"
"time"
)
// ChannelWriterConfig allows to configure ChannelWriter.
type ChannelWriterConfig struct {
// Capacity specifies the underlying channel capacity.
Capacity int
// Appender specified the basic Appender for all Entries.
//
// Default Appender is WriterAppender with JSON Encoder.
Appender Appender
// ErrorAppender specifies the Appender for errors returning by basic
// Appender.
//
// Default ErrorAppender does nothing.
ErrorAppender Appender
// EnableSyncOnError specifies whether Appender.Sync should be called
// for messages with LevelError or not.
//
// Default value is false.
EnableSyncOnError bool
}
// WithDefaults returns the new config in which all uninitialized fields are
// filled with their default values.
func (c ChannelWriterConfig) WithDefaults() ChannelWriterConfig {
// Chan efficiency depends on the number of CPU installed in the system.
// Tests shows that min chan capacity should be twice as big as CPU count.
minCap := runtime.NumCPU() * 2
if c.Capacity < minCap {
c.Capacity = minCap
}
// No ErrorAppender by default.
if c.ErrorAppender == nil {
c.ErrorAppender = NewDiscardAppender()
}
// Default appender writes JSON-formatter messages to stdout.
if c.Appender == nil {
c.Appender = NewWriteAppender(os.Stdout, NewJSONEncoder.Default())
}
return c
}
// NewChannelWriter returns a new ChannelWriter with the given config.
var NewChannelWriter = channelWriterGetter(
func(cfg ChannelWriterConfig) (EntryWriter, ChannelWriterCloseFunc) {
l := &channelWriter{}
l.init(cfg.WithDefaults())
return l, ChannelWriterCloseFunc(
func() {
l.close()
})
},
)
// ChannelWriterCloseFunc allows to close channel writer.
type ChannelWriterCloseFunc func()
type channelWriter struct {
ChannelWriterConfig
sync.Mutex
sync.WaitGroup
ch chan Entry
closed bool
}
func (l *channelWriter) WriteEntry(e Entry) {
l.ch <- e
}
func (l *channelWriter) init(cfg ChannelWriterConfig) {
l.ChannelWriterConfig = cfg
l.ch = make(chan Entry, l.Capacity)
l.Add(1)
go l.worker()
}
func (l *channelWriter) close() {
l.Lock()
defer l.Unlock()
// Double close is allowed.
if !l.closed {
close(l.ch)
l.Wait()
// Mark channel as closed and drained. Channel is not reset to nil,
// that allows build-it panic in case of calling WriterEntry after
// Close.
l.closed = true
}
}
func (l *channelWriter) worker() {
defer l.Done()
var e Entry
var ok bool
for {
select {
case e, ok = <-l.ch:
default:
// Channel is empty. Force appender to flush.
l.flush()
e, ok = <-l.ch
}
if !ok {
break
}
l.append(e)
}
// Force appender to flush & sync at exit.
l.flush()
l.sync()
}
func (l *channelWriter) flush() {
err := l.Appender.Flush()
if err != nil {
l.reportError("logf: failed to flush appender", err)
}
}
func (l *channelWriter) sync() {
err := l.Appender.Sync()
if err != nil {
l.reportError("logf: failed to sync appender", err)
}
}
func (l *channelWriter) append(e Entry) {
err := l.Appender.Append(e)
if err != nil {
l.reportError("logf: failed to append entry", err)
}
// Force appender to Sync if entry contains an error message.
// This allows to commit buffered messages in case of further unexpected
// panic or crash.
if e.Level <= LevelError {
l.flush()
if l.EnableSyncOnError {
l.sync()
}
}
}
func (l *channelWriter) reportError(text string, err error) {
skipError(l.ErrorAppender.Append(newErrorEntry(text, Error(err))))
skipError(l.ErrorAppender.Flush())
skipError(l.ErrorAppender.Sync())
}
func skipError(_ error) {
}
func newErrorEntry(text string, fs ...Field) Entry {
return Entry{
LoggerID: -1,
Level: LevelError,
Time: time.Now(),
Text: text,
Fields: fs,
}
}
type channelWriterGetter func(cfg ChannelWriterConfig) (EntryWriter, ChannelWriterCloseFunc)
func (c channelWriterGetter) Default() (EntryWriter, ChannelWriterCloseFunc) {
return c(ChannelWriterConfig{})
}