-
Notifications
You must be signed in to change notification settings - Fork 23
/
reader.go
135 lines (115 loc) · 3.3 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package shuttle
import (
"bufio"
"io"
"sync"
"time"
"github.com/rcrowley/go-metrics"
)
// LogLineReader performs the reading of lines from an io.ReadCloser, encapsulating
// lines into a LogLine and emitting them on outbox
type LogLineReader struct {
input io.ReadCloser // The input to read from
out chan<- Batch // Where to send batches
close chan struct{}
batchSize int // size of new batches
timeOut time.Duration // batch timeout
timer *time.Timer // timer to actually enforce timeout
drops *Counter
drop bool // Should we drop or block
linesRead metrics.Counter
linesBatchedCount metrics.Counter
linesDroppedCount metrics.Counter
batchFillTime metrics.Timer
mu sync.Mutex // protects access to below
b Batch
}
// NewLogLineReader constructs a new reader with it's own Outbox.
func NewLogLineReader(input io.ReadCloser, s *Shuttle) *LogLineReader {
t := time.NewTimer(time.Second)
t.Stop() // we only need a timer running when we actually have log lines in the batch
ll := LogLineReader{
input: input,
out: s.Batches,
close: make(chan struct{}),
batchSize: s.config.BatchSize,
timeOut: s.config.WaitDuration,
timer: t,
drops: s.Drops,
drop: s.config.Drop,
linesRead: metrics.GetOrRegisterCounter("lines.read", s.MetricsRegistry),
linesBatchedCount: metrics.GetOrRegisterCounter("lines.batched", s.MetricsRegistry),
linesDroppedCount: metrics.GetOrRegisterCounter("lines.dropped", s.MetricsRegistry),
batchFillTime: metrics.GetOrRegisterTimer("batch.fill", s.MetricsRegistry),
b: NewBatch(s.config.BatchSize),
}
go ll.expireBatches()
return &ll
}
func (rdr *LogLineReader) expireBatches() {
for {
select {
case <-rdr.close:
return
case <-rdr.timer.C:
rdr.mu.Lock()
rdr.deliverOrDropCurrent(rdr.timeOut)
rdr.mu.Unlock()
}
}
}
//Close the reader for input
func (rdr *LogLineReader) Close() error {
return rdr.input.Close()
}
// ReadLines from the input created for. Return any errors
// blocks until the underlying reader is closed
func (rdr *LogLineReader) ReadLines() error {
rdrIo := bufio.NewReader(rdr.input)
now := time.Now()
for {
line, err := rdrIo.ReadBytes('\n')
if len(line) > 0 {
currentLogTime := time.Now()
rdr.linesRead.Inc(1)
rdr.mu.Lock()
if full := rdr.b.Add(LogLine{line, currentLogTime}); full {
rdr.deliverOrDropCurrent(time.Since(now))
}
if rdr.b.MsgCount() == 1 { // First line so restart the timer
now = time.Now()
rdr.timer.Reset(rdr.timeOut)
}
rdr.mu.Unlock()
}
if err != nil {
rdr.mu.Lock()
rdr.deliverOrDropCurrent(time.Since(now))
rdr.mu.Unlock()
close(rdr.close)
return err
}
}
}
// Should only be called when rdr.mu is held
func (rdr *LogLineReader) deliverOrDropCurrent(d time.Duration) {
rdr.timer.Stop()
// There is the possibility of a new batch being expired while this is happening.
// so guard against queueing up an empty batch
if c := rdr.b.MsgCount(); c > 0 {
if rdr.drop {
select {
case rdr.out <- rdr.b:
rdr.linesBatchedCount.Inc(int64(c))
default:
rdr.linesDroppedCount.Inc(int64(c))
rdr.drops.Add(c)
}
} else {
rdr.out <- rdr.b
rdr.linesBatchedCount.Inc(int64(c))
}
rdr.batchFillTime.Update(d)
rdr.b = NewBatch(rdr.batchSize)
}
}