Skip to content

Commit

Permalink
Change spooler timeout flush to happen after lastFlush + timeout (#2371)
Browse files Browse the repository at this point in the history
In the previous code, the flush always happened after timeout, independent if a flush just happened before. This could lead to unnecessary flushes. In case the spooler is full and data is flushed, the ticker will be reset.

Previously the ticker was set to timeout/2. Now it is timeout.
  • Loading branch information
ruflin authored and Steffen Siering committed Aug 26, 2016
1 parent 10f7aab commit 2a965a2
Showing 1 changed file with 28 additions and 28 deletions.
56 changes: 28 additions & 28 deletions filebeat/spooler/spooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ const channelSize = 16

// Spooler aggregates the events and sends the aggregated data to the publisher.
type Spooler struct {
Channel chan *input.Event // Channel is the input to the Spooler.
config spoolerConfig
exit chan struct{} // Channel used to signal shutdown.
nextFlushTime time.Time // Scheduled time of the next flush.
publisher chan<- []*input.Event // Channel used to publish events.
spool []*input.Event // Events being held by the Spooler.
wg sync.WaitGroup // WaitGroup used to control the shutdown.
Channel chan *input.Event // Channel is the input to the Spooler.
config spoolerConfig
exit chan struct{} // Channel used to signal shutdown.
publisher chan<- []*input.Event // Channel used to publish events.
spool []*input.Event // Events being held by the Spooler.
wg sync.WaitGroup // WaitGroup used to control the shutdown.
}

type spoolerConfig struct {
Expand All @@ -43,10 +42,9 @@ func New(
idleTimeout: config.IdleTimeout,
spoolSize: config.SpoolSize,
},
exit: make(chan struct{}),
nextFlushTime: time.Now().Add(config.IdleTimeout),
publisher: publisher,
spool: make([]*input.Event, 0, config.SpoolSize),
exit: make(chan struct{}),
publisher: publisher,
spool: make([]*input.Event, 0, config.SpoolSize),
}, nil
}

Expand All @@ -61,23 +59,32 @@ func (s *Spooler) Start() {
func (s *Spooler) run() {
defer s.wg.Done()

ticker := time.NewTicker(s.config.idleTimeout / 2)

logp.Info("Starting spooler: spool_size: %v; idle_timeout: %s",
s.config.spoolSize, s.config.idleTimeout)

timer := time.NewTimer(s.config.idleTimeout)

loop:
for {
select {
case <-s.exit:
ticker.Stop()
timer.Stop()
break loop
case event := <-s.Channel:
if event != nil {
s.queue(event)
flushed := s.queue(event)
if flushed {
// Stop timer and drain channel. See https://golang.org/pkg/time/#Timer.Reset
if !timer.Stop() {
<-timer.C
}
timer.Reset(s.config.idleTimeout)
}
}
case <-ticker.C:
s.timedFlush()
case <-timer.C:
debugf("Flushing spooler because of timeout. Events flushed: %v", len(s.spool))
s.flush()
timer.Reset(s.config.idleTimeout)
}
}

Expand Down Expand Up @@ -109,21 +116,15 @@ func (s *Spooler) Stop() {
// queue queues a single event to be spooled. If the queue reaches spoolSize
// while calling this method then all events in the queue will be flushed to
// the publisher.
func (s *Spooler) queue(event *input.Event) {
func (s *Spooler) queue(event *input.Event) bool {
flushed := false
s.spool = append(s.spool, event)
if len(s.spool) == cap(s.spool) {
debugf("Flushing spooler because spooler full. Events flushed: %v", len(s.spool))
s.flush()
flushed = true
}
}

// timedFlush flushes the events in the queue if a flush has not occurred
// for a period of time greater than idleTimeout.
func (s *Spooler) timedFlush() {
if time.Now().After(s.nextFlushTime) {
debugf("Flushing spooler because of timeout. Events flushed: %v", len(s.spool))
s.flush()
}
return flushed
}

// flush flushes all events to the publisher.
Expand All @@ -141,5 +142,4 @@ func (s *Spooler) flush() {
case s.publisher <- tmpCopy: // send
}
}
s.nextFlushTime = time.Now().Add(s.config.idleTimeout)
}

0 comments on commit 2a965a2

Please sign in to comment.