Skip to content

Commit

Permalink
Merge pull request #1047 from ploxiln/statsd_exit
Browse files Browse the repository at this point in the history
nsqd: make statsd SpreadWriter use exit chan
  • Loading branch information
mreiferson authored Jul 4, 2018
2 parents bf74c94 + f79f4ec commit e3897d9
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 6 deletions.
13 changes: 9 additions & 4 deletions internal/writers/spread_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ type SpreadWriter struct {
w io.Writer
interval time.Duration
buf [][]byte
exitCh chan int
}

func NewSpreadWriter(w io.Writer, interval time.Duration) *SpreadWriter {
func NewSpreadWriter(w io.Writer, interval time.Duration, exitCh chan int) *SpreadWriter {
return &SpreadWriter{
w: w,
interval: interval,
buf: make([][]byte, 0),
exitCh: exitCh,
}
}

Expand All @@ -28,11 +30,14 @@ func (s *SpreadWriter) Write(p []byte) (int, error) {

func (s *SpreadWriter) Flush() {
sleep := s.interval / time.Duration(len(s.buf))
ticker := time.NewTicker(sleep)
for _, b := range s.buf {
start := time.Now()
s.w.Write(b)
latency := time.Now().Sub(start)
time.Sleep(sleep - latency)
select {
case <-ticker.C:
case <-s.exitCh: // skip sleeps finish writes
}
}
ticker.Stop()
s.buf = s.buf[:0]
}
3 changes: 2 additions & 1 deletion nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,10 +473,11 @@ func (n *NSQD) Exit() {
}
n.Unlock()

n.logf(LOG_INFO, "NSQ: stopping subsystems")
close(n.exitChan)
n.waitGroup.Wait()

n.dl.Unlock()
n.logf(LOG_INFO, "NSQ: bye")
}

// GetTopic performs a thread safe operation
Expand Down
3 changes: 2 additions & 1 deletion nsqd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (n *NSQD) statsdLoop() {
n.logf(LOG_ERROR, "failed to create UDP socket to statsd(%s)", addr)
continue
}
sw := writers.NewSpreadWriter(conn, interval-time.Second)
sw := writers.NewSpreadWriter(conn, interval-time.Second, n.exitChan)
bw := writers.NewBoundaryBufferedWriter(sw, n.getOpts().StatsdUDPPacketSize)
client := statsd.NewClient(bw, prefix)

Expand Down Expand Up @@ -143,6 +143,7 @@ func (n *NSQD) statsdLoop() {

exit:
ticker.Stop()
n.logf(LOG_INFO, "STATSD: closing")
}

func percentile(perc float64, arr []uint64, length int) uint64 {
Expand Down

0 comments on commit e3897d9

Please sign in to comment.