Skip to content

Commit

Permalink
Apply close_timeout also when output is blocked (#3511)
Browse files Browse the repository at this point in the history
Currently `close_timeout` does not apply in case the output is blocked. This PR changes the behavior of `close_timeout` to also close a file handler when the output is blocked.

It is important to note, that this closes the file handler but NOT the harvester. This is important as the closing of the harvester requires a state update to set `state.Finished=true`. If this would not happen and the harvester is closed, processing would not continue when the output becomes available again.

Previously the internal state of a harvester was updated when the event was created. This could lead to the issue that in case an event was not sent but the state update went through, that an event would be missing. This is now prevent by overwriting the internal state only when the event was successfully sent.

The done channels from prospector and harvester are renamed to be more obvious which one belongs to what: h.done -> h.prospectorDone, h.harvestDone -> h.done. As the harvester channel is close with the `stop` method in all cases `h.done` is sufficient in most places.

This PR does not solve the problem related to reloading and stopping a harvester mentioned in #3511 (comment) related to reloading. This will be done in a follow up PR.
  • Loading branch information
ruflin authored and Steffen Siering committed Feb 7, 2017
1 parent 04f7147 commit 72ff178
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- Add the `pipeline` config option at the prospector level, for configuring the Ingest Node pipeline ID. {pull}3433[3433]
- Update regular expressions used for matching file names or lines (multiline, include/exclude functionality) to new matchers improving performance of simple string matches. {pull}3469[3469]
- The `symlinks` and `harverster_limit` settings are now GA, instead of experimental. {pull}3525[3525]
- close_timeout is also applied when the output is blocking. {pull}3511[3511]

*Heartbeat*

Expand Down
6 changes: 5 additions & 1 deletion filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package harvester
import (
"errors"
"fmt"
"sync"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
Expand All @@ -39,6 +40,8 @@ type Harvester struct {
fileReader *LogFile
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
prospectorDone chan struct{}
once sync.Once
done chan struct{}
}

Expand All @@ -53,7 +56,8 @@ func NewHarvester(
config: defaultConfig,
state: state,
prospectorChan: prospectorChan,
done: done,
prospectorDone: done,
done: make(chan struct{}),
}

if err := cfg.Unpack(&h.config); err != nil {
Expand Down
65 changes: 42 additions & 23 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,29 @@ func (h *Harvester) Harvest(r reader.Reader) {
defer h.close()

// Channel to stop internal harvester routines
harvestDone := make(chan struct{})
defer close(harvestDone)
defer h.stop()

// Closes reader after timeout or when done channel is closed
// This routine is also responsible to properly stop the reader
go func() {
var closeTimeout <-chan time.Time

closeTimeout := make(<-chan time.Time)
// starts close_timeout timer
if h.config.CloseTimeout > 0 {
closeTimeout = time.After(h.config.CloseTimeout)
}

select {
// Applies when timeout is reached
case <-closeTimeout:
logp.Info("Closing harvester because close_timeout was reached: %s", h.state.Source)
logp.Info("Closing harvester because close_timeout was reached.")
// Required for shutdown when hanging inside reader
case <-h.done:
case <-h.prospectorDone:
// Required when reader loop returns and reader finished
case <-harvestDone:
case <-h.done:
}

h.stop()
h.fileReader.Close()
}()

Expand Down Expand Up @@ -122,9 +125,10 @@ func (h *Harvester) Harvest(r reader.Reader) {
// Update offset
h.state.Offset += int64(message.Bytes)

// Create state event
event := input.NewEvent(h.getState())
state := h.getState()

// Create state event
event := input.NewEvent(state)
text := string(message.Content)

// Check if data should be added to event. Only export non empty events.
Expand All @@ -147,12 +151,21 @@ func (h *Harvester) Harvest(r reader.Reader) {
if !h.sendEvent(event) {
return
}
// Update state of harvester as successfully sent
h.state = state
}
}

func (h *Harvester) stop() {
h.once.Do(func() {
close(h.done)
})
}

// sendEvent sends event to the spooler channel
// Return false if event was not sent
func (h *Harvester) sendEvent(event *input.Event) bool {

select {
case <-h.done:
return false
Expand All @@ -161,6 +174,21 @@ func (h *Harvester) sendEvent(event *input.Event) bool {
}
}

// sendStateUpdate send an empty event with the current state to update the registry
// close_timeout does not apply here to make sure a harvester is closed properly. In
// case the output is blocked the harvester will stay open to make sure no new harvester
// is started. As soon as the output becomes available again, the finished state is written
// and processing can continue.
func (h *Harvester) sendStateUpdate() {
logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset)
event := input.NewEvent(h.state)

select {
case <-h.prospectorDone:
case h.prospectorChan <- event: // ship the new event downstream
}
}

// shouldExportLine decides if the line is exported or not based on
// the include_lines and exclude_lines options.
func (h *Harvester) shouldExportLine(line string) bool {
Expand Down Expand Up @@ -260,22 +288,18 @@ func (h *Harvester) initFileOffset(file *os.File) (int64, error) {
return file.Seek(0, os.SEEK_CUR)
}

// sendStateUpdate send an empty event with the current state to update the registry
func (h *Harvester) sendStateUpdate() bool {
logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset)
event := input.NewEvent(h.getState())
return h.sendEvent(event)
}

// getState returns an updated copy of the harvester state
func (h *Harvester) getState() file.State {

if h.config.InputType == config.StdinInputType {
return file.State{}
}

state := h.state

// refreshes the values in State with the values from the harvester itself
h.state.FileStateOS = file.GetOSState(h.state.Fileinfo)
return h.state
state.FileStateOS = file.GetOSState(h.state.Fileinfo)
return state
}

func (h *Harvester) close() {
Expand All @@ -289,6 +313,7 @@ func (h *Harvester) close() {
// If file was never opened, it can't be closed
if h.file != nil {

// close file handler
h.file.Close()

logp.Debug("harvester", "Closing file: %s", h.state.Source)
Expand Down Expand Up @@ -350,9 +375,3 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {

return reader.NewLimit(r, h.config.MaxBytes), nil
}

/*
TODO: introduce new structure: log_file —[raw bytes]—> (line —[utf8 bytes]—> encode) —[message]—> …`
*/
1 change: 0 additions & 1 deletion filebeat/harvester/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func TestReadLine(t *testing.T) {
},
file: f,
}
assert.NotNil(t, h)

var ok bool
h.encodingFactory, ok = encoding.FindEncoding(h.config.Encoding)
Expand Down
6 changes: 3 additions & 3 deletions filebeat/tests/system/test_multiline.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def test_close_timeout_with_multiline(self):
pattern="^\[",
negate="true",
match="after",
close_timeout="1s",
close_timeout="2s",
)

os.mkdir(self.working_dir + "/log/")
Expand Down Expand Up @@ -286,7 +286,7 @@ def test_close_timeout_with_multiline(self):
# close_timeout must have closed the reader exactly twice
self.wait_until(
lambda: self.log_contains_count(
"Closing harvester because close_timeout was reached") == 2,
"Closing harvester because close_timeout was reached") >= 1,
max_timeout=15)

output = self.read_output()
Expand All @@ -302,7 +302,7 @@ def test_consecutive_newline(self):
pattern="^\[",
negate="true",
match="after",
close_timeout="1s",
close_timeout="2s",
)

logentry1 = """[2016-09-02 19:54:23 +0000] Started 2016-09-02 19:54:23 +0000 "GET" for /gaq?path=%2FCA%2FFallbrook%2F1845-Acacia-Ln&referer=http%3A%2F%2Fwww.xxxxx.com%2FAcacia%2BLn%2BFallbrook%2BCA%2Baddresses&search_bucket=none&page_controller=v9%2Faddresses&page_action=show at 23.235.47.31
Expand Down

0 comments on commit 72ff178

Please sign in to comment.