Skip to content

Commit

Permalink
implement review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ruflin committed Apr 13, 2017
1 parent 92a2ef5 commit f8f540b
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,12 @@ func (l *Log) Run() {
// os.Stat will return an error in case the file does not exist
stat, err := os.Stat(state.Source)
if err != nil {
l.removeState(state)
logp.Debug("prospector", "Remove state for file as file removed: %s", state.Source)
if os.IsNotExist(err) {
l.removeState(state)
logp.Debug("prospector", "Remove state for file as file removed: %s", state.Source)
} else {
logp.Err("Prospector state for %s was not removed: %s", state.Source, err)
}
} else {
// Check if existing source on disk and state are the same. Remove if not the case.
newState := file.NewState(stat, state.Source)
Expand All @@ -111,16 +115,19 @@ func (l *Log) Run() {
}

func (l *Log) removeState(state file.State) {

// Only clean up files where state is Finished
if state.Finished {
state.TTL = 0
err := l.Prospector.updateState(input.NewEvent(state))
if err != nil {
logp.Err("File cleanup state update error: %s", err)
}
} else {
if !state.Finished {
logp.Debug("prospector", "State for file not removed because harvester not finished: %s", state.Source)
return
}

state.TTL = 0
err := l.Prospector.updateState(input.NewEvent(state))
if err != nil {
logp.Err("File cleanup state update error: %s", err)
}

}

// getFiles returns all files which have to be harvested
Expand Down

0 comments on commit f8f540b

Please sign in to comment.