Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Feature/extra template context #95

Merged
merged 6 commits into from
May 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion AUTHORS
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
Fabian Stäber <fabian@fstab.de>
Fabian Stäber <fabian@fstab.de>
Emil Madsen <emil@magenta.dk>
26 changes: 25 additions & 1 deletion CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,13 @@ This simple example shows a one-to-one mapping of a Grok field to a Prometheus l

### Pre-Defined Label Variables

`logfile` is currently the only pre-defined label variable that is independent of the Grok patterns. The `logfile` variable is always present for input type `file`, and contains the full path to the log file the line was read from. You can use it like this:
Two pre-defined label variables, that are independent of Grok patterns are defined, namely:
* `logfile`: Which contains the full path of the log file the line was read from (for input type `file`).
* `extra`: Which contains the entire JSON object parsed from the input (for input type `webhook`, with format=`json_*`).

#### logfile
The `logfile` variable is always present for input type `file`, and contains the full path to the log file the line was read from.
You can use it like this:

```yaml
match: '%{DATE} %{TIME} %{USER:user} %{NUMBER:val}'
Expand All @@ -354,6 +360,24 @@ labels:

If you don't want the full path but only the file name, you can use the `base` template function, see next section.

#### extra
The `extra` variable is always present for input type `webhook` with format being either `json_single` or `json_bulk`.
It contains the entire JSON object that was parsed.
You can use it like this:

```yaml
match: 'Login occured'
labels:
user: '{{ index .extra "user" }}'
ip: '{{ index .extra "ip" }}'
```

With the incoming log object being:

```json
{"message": "Login occured", "user": "Skeen", "ip": "1.1.1.1"}'
```

### Label Template Functions

Label values are defined as [Go templates]. `grok_exporter` supports the following template functions: `gsub`, `base`, `add`, `subtract`, `multiply`, `divide`.
Expand Down
46 changes: 23 additions & 23 deletions exporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ type Metric interface {

PathMatches(logfilePath string) bool
// Returns the match if the line matched, and nil if the line didn't match.
ProcessMatch(line string, additionalFields map[string]string) (*Match, error)
ProcessMatch(line string, additionalFields map[string]interface{}) (*Match, error)
// Returns the match if the delete pattern matched, nil otherwise.
ProcessDeleteMatch(line string, additionalFields map[string]string) (*Match, error)
ProcessDeleteMatch(line string, additionalFields map[string]interface{}) (*Match, error)
// Remove old metrics
ProcessRetention() error
}
Expand Down Expand Up @@ -199,7 +199,7 @@ func (m *observeMetric) processMatch(line string, cb func(value float64)) (*Matc
}
}

func (m *metricWithLabels) processMatch(line string, additionalFields map[string]string, callback func(labels map[string]string)) (*Match, error) {
func (m *metricWithLabels) processMatch(line string, additionalFields map[string]interface{}, callback func(labels map[string]string)) (*Match, error) {
searchResult, err := m.regex.Search(line)
if err != nil {
return nil, fmt.Errorf("error while processing metric %v: %v", m.Name(), err.Error())
Expand All @@ -221,7 +221,7 @@ func (m *metricWithLabels) processMatch(line string, additionalFields map[string
}
}

func (m *observeMetricWithLabels) processMatch(line string, additionalFields map[string]string, callback func(value float64, labels map[string]string)) (*Match, error) {
func (m *observeMetricWithLabels) processMatch(line string, additionalFields map[string]interface{}, callback func(value float64, labels map[string]string)) (*Match, error) {
searchResult, err := m.regex.Search(line)
if err != nil {
return nil, fmt.Errorf("error processing metric %v: %v", m.Name(), err.Error())
Expand All @@ -247,7 +247,7 @@ func (m *observeMetricWithLabels) processMatch(line string, additionalFields map
}
}

func (m *metric) ProcessDeleteMatch(line string, additionalFields map[string]string) (*Match, error) {
func (m *metric) ProcessDeleteMatch(line string, additionalFields map[string]interface{}) (*Match, error) {
if m.deleteRegex == nil {
return nil, nil
}
Expand All @@ -261,7 +261,7 @@ func (m *metric) ProcessRetention() error {
return fmt.Errorf("error processing metric %v: retention is currently only supported for metrics with labels.", m.Name())
}

func (m *metricWithLabels) processDeleteMatch(line string, vec deleterMetric, additionalFields map[string]string) (*Match, error) {
func (m *metricWithLabels) processDeleteMatch(line string, vec deleterMetric, additionalFields map[string]interface{}) (*Match, error) {
if m.deleteRegex == nil {
return nil, nil
}
Expand Down Expand Up @@ -299,27 +299,27 @@ func (m *metricWithLabels) processRetention(vec deleterMetric) error {
return nil
}

func (m *counterMetric) ProcessMatch(line string, additionalFields map[string]string) (*Match, error) {
func (m *counterMetric) ProcessMatch(line string, additionalFields map[string]interface{}) (*Match, error) {
return m.processMatch(line, func() {
m.counter.Inc()
})
}

func (m *counterVecMetric) ProcessMatch(line string, additionalFields map[string]string) (*Match, error) {
func (m *counterVecMetric) ProcessMatch(line string, additionalFields map[string]interface{}) (*Match, error) {
return m.processMatch(line, additionalFields, func(labels map[string]string) {
m.counterVec.With(labels).Inc()
})
}

func (m *counterVecMetric) ProcessDeleteMatch(line string, additionalFields map[string]string) (*Match, error) {
func (m *counterVecMetric) ProcessDeleteMatch(line string, additionalFields map[string]interface{}) (*Match, error) {
return m.processDeleteMatch(line, m.counterVec, additionalFields)
}

func (m *counterVecMetric) ProcessRetention() error {
return m.processRetention(m.counterVec)
}

func (m *gaugeMetric) ProcessMatch(line string, additionalFields map[string]string) (*Match, error) {
func (m *gaugeMetric) ProcessMatch(line string, additionalFields map[string]interface{}) (*Match, error) {
return m.processMatch(line, func(value float64) {
if m.cumulative {
m.gauge.Add(value)
Expand All @@ -329,7 +329,7 @@ func (m *gaugeMetric) ProcessMatch(line string, additionalFields map[string]stri
})
}

func (m *gaugeVecMetric) ProcessMatch(line string, additionalFields map[string]string) (*Match, error) {
func (m *gaugeVecMetric) ProcessMatch(line string, additionalFields map[string]interface{}) (*Match, error) {
return m.processMatch(line, additionalFields, func(value float64, labels map[string]string) {
if m.cumulative {
m.gaugeVec.With(labels).Add(value)
Expand All @@ -339,47 +339,47 @@ func (m *gaugeVecMetric) ProcessMatch(line string, additionalFields map[string]s
})
}

func (m *gaugeVecMetric) ProcessDeleteMatch(line string, additionalFields map[string]string) (*Match, error) {
func (m *gaugeVecMetric) ProcessDeleteMatch(line string, additionalFields map[string]interface{}) (*Match, error) {
return m.processDeleteMatch(line, m.gaugeVec, additionalFields)
}

func (m *gaugeVecMetric) ProcessRetention() error {
return m.processRetention(m.gaugeVec)
}

func (m *histogramMetric) ProcessMatch(line string, additionalFields map[string]string) (*Match, error) {
func (m *histogramMetric) ProcessMatch(line string, additionalFields map[string]interface{}) (*Match, error) {
return m.processMatch(line, func(value float64) {
m.histogram.Observe(value)
})
}

func (m *histogramVecMetric) ProcessMatch(line string, additionalFields map[string]string) (*Match, error) {
func (m *histogramVecMetric) ProcessMatch(line string, additionalFields map[string]interface{}) (*Match, error) {
return m.processMatch(line, additionalFields, func(value float64, labels map[string]string) {
m.histogramVec.With(labels).Observe(value)
})
}

func (m *histogramVecMetric) ProcessDeleteMatch(line string, additionalFields map[string]string) (*Match, error) {
func (m *histogramVecMetric) ProcessDeleteMatch(line string, additionalFields map[string]interface{}) (*Match, error) {
return m.processDeleteMatch(line, m.histogramVec, additionalFields)
}

func (m *histogramVecMetric) ProcessRetention() error {
return m.processRetention(m.histogramVec)
}

func (m *summaryMetric) ProcessMatch(line string, additionalFields map[string]string) (*Match, error) {
func (m *summaryMetric) ProcessMatch(line string, additionalFields map[string]interface{}) (*Match, error) {
return m.processMatch(line, func(value float64) {
m.summary.Observe(value)
})
}

func (m *summaryVecMetric) ProcessMatch(line string, additionalFields map[string]string) (*Match, error) {
func (m *summaryVecMetric) ProcessMatch(line string, additionalFields map[string]interface{}) (*Match, error) {
return m.processMatch(line, additionalFields, func(value float64, labels map[string]string) {
m.summaryVec.With(labels).Observe(value)
})
}

func (m *summaryVecMetric) ProcessDeleteMatch(line string, additionalFields map[string]string) (*Match, error) {
func (m *summaryVecMetric) ProcessDeleteMatch(line string, additionalFields map[string]interface{}) (*Match, error) {
return m.processDeleteMatch(line, m.summaryVec, additionalFields)
}

Expand Down Expand Up @@ -500,7 +500,7 @@ func NewSummaryMetric(cfg *configuration.MetricConfig, regex *oniguruma.Regex, d
}
}

func labelValues(metricName string, searchResult *oniguruma.SearchResult, templates []template.Template, additionalFields map[string]string) (map[string]string, error) {
func labelValues(metricName string, searchResult *oniguruma.SearchResult, templates []template.Template, additionalFields map[string]interface{}) (map[string]string, error) {
result := make(map[string]string, len(templates))
for _, t := range templates {
value, err := evalTemplate(searchResult, t, additionalFields)
Expand All @@ -512,7 +512,7 @@ func labelValues(metricName string, searchResult *oniguruma.SearchResult, templa
return result, nil
}

func floatValue(metricName string, searchResult *oniguruma.SearchResult, valueTemplate template.Template, additionalFields map[string]string) (float64, error) {
func floatValue(metricName string, searchResult *oniguruma.SearchResult, valueTemplate template.Template, additionalFields map[string]interface{}) (float64, error) {
stringVal, err := evalTemplate(searchResult, valueTemplate, additionalFields)
if err != nil {
return 0, fmt.Errorf("error processing metric %v: %v", metricName, err.Error())
Expand All @@ -524,10 +524,10 @@ func floatValue(metricName string, searchResult *oniguruma.SearchResult, valueTe
return floatVal, nil
}

func evalTemplate(searchResult *oniguruma.SearchResult, t template.Template, additionalFields map[string]string) (string, error) {
func evalTemplate(searchResult *oniguruma.SearchResult, t template.Template, additionalFields map[string]interface{}) (string, error) {
var (
values = make(map[string]string, len(t.ReferencedGrokFields()))
value string
values = make(map[string]interface{}, len(t.ReferencedGrokFields()))
value interface{}
ok bool
err error
field string
Expand Down
4 changes: 2 additions & 2 deletions exporter/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ func TestLogfileLabel(t *testing.T) {
"logfile": "{{.logfile}}",
},
})
logfile1 := map[string]string{
logfile1 := map[string]interface{}{
"logfile": "/var/log/exim-1.log",
}
logfile2 := map[string]string{
logfile2 := map[string]interface{}{
"logfile": "/var/log/exim-2.log",
}
counter := NewCounterMetric(counterCfg, regex, nil)
Expand Down
7 changes: 5 additions & 2 deletions grok_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (

var (
logfile = "logfile"
extra = "extra"
)

const (
Expand All @@ -49,6 +50,7 @@ const (

var additionalFieldDefinitions = map[string]string{
logfile: "full path of the log file",
extra: "full json log object",
}

func main() {
Expand Down Expand Up @@ -162,9 +164,10 @@ func main() {
}
}

func makeAdditionalFields(line *fswatcher.Line) map[string]string {
return map[string]string{
func makeAdditionalFields(line *fswatcher.Line) map[string]interface{} {
return map[string]interface{}{
logfile: line.File,
extra: line.Extra,
}
}

Expand Down
1 change: 1 addition & 0 deletions tailer/fswatcher/fswatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type FileTailer interface {
type Line struct {
Line string
File string
Extra interface{}
}

// ideas how this might look like in the config file:
Expand Down
33 changes: 22 additions & 11 deletions tailer/webhookTailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ import (
"strings"
)

type context_string struct {
// The log line itself
line string
// Optional extra context to be made available to go templating
extra map[string]interface{}
}

type WebhookTailer struct {
lines chan *fswatcher.Line
errors chan fswatcher.Error
Expand Down Expand Up @@ -89,27 +96,31 @@ func (t WebhookTailer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
defer r.Body.Close()

lines := WebhookProcessBody(wts.config, b)
for _, line := range lines {
context_strings := WebhookProcessBody(wts.config, b)
for _, context_string := range context_strings {
logrus.WithFields(logrus.Fields{
"line": line,
"line": context_string.line,
"extra": context_string.extra,
}).Debug("Groking line")
lineChan <- &fswatcher.Line{Line: line}
lineChan <- &fswatcher.Line{Line: context_string.line, Extra: context_string.extra}
}
return
}

func WebhookProcessBody(c *configuration.InputConfig, b []byte) []string {
func WebhookProcessBody(c *configuration.InputConfig, b []byte) []context_string {

strs := []string{}
strs := []context_string{}

switch c.WebhookFormat {
case "text_single":
s := strings.TrimSpace(string(b))
s := context_string{line: strings.TrimSpace(string(b))}
strs = append(strs, s)
case "text_bulk":
s := strings.TrimSpace(string(b))
strs = strings.Split(s, c.WebhookTextBulkSeparator)
lines := strings.Split(s, c.WebhookTextBulkSeparator)
for _, s := range lines {
strs = append(strs, context_string{line: s})
}
case "json_single":
if len(c.WebhookJsonSelector) == 0 || c.WebhookJsonSelector[0] != '.' {
logrus.Errorf("%v: invalid webhook json selector", c.WebhookJsonSelector)
Expand All @@ -130,7 +141,7 @@ func WebhookProcessBody(c *configuration.InputConfig, b []byte) []string {
}).Warn("Unable to find selector path")
break
}
strs = append(strs, s)
strs = append(strs, context_string{line: s, extra: j.MustMap()})
case "json_bulk":
if len(c.WebhookJsonSelector) == 0 || c.WebhookJsonSelector[0] != '.' {
logrus.Errorf("%v: invalid webhook json selector", c.WebhookJsonSelector)
Expand Down Expand Up @@ -158,15 +169,15 @@ func WebhookProcessBody(c *configuration.InputConfig, b []byte) []string {
}).Warn("Unable to find selector path")
break
}
strs = append(strs, s)
strs = append(strs, context_string{line: s, extra: ej.MustMap()})
}
default:
// error silently
}

// Trim whitespace before and after every log entry
for i := range strs {
strs[i] = strings.TrimSpace(strs[i])
strs[i] = context_string{line: strings.TrimSpace(strs[i].line), extra: strs[i].extra}
}

return strs
Expand Down
10 changes: 5 additions & 5 deletions tailer/webhookTailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestWebhookTextSingle(t *testing.T) {
if len(lines) != 1 {
t.Fatal("Expected 1 line processed")
}
if lines[0] != message {
if lines[0].line != message {
t.Fatal("Expected line to match")
}
}
Expand All @@ -62,7 +62,7 @@ func TestWebhookTextBulk(t *testing.T) {
t.Fatal("Expected number of lines to equal number of messages")
}
for i := range messages {
if messages[i] != lines[i] {
if messages[i] != lines[i].line {
t.Fatal("Expected line to match")
}
}
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestWebhookJsonSingle(t *testing.T) {
if len(lines) != 1 {
t.Fatal("Expected 1 line processed")
}
if lines[0] != message {
if lines[0].line != message {
t.Fatal("Expected line to match")
}
}
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestWebhookJsonBulk(t *testing.T) {
t.Fatal("Expected number of lines to equal number of messages")
}
for i := range messages {
if messages[i] != lines[i] {
if messages[i] != lines[i].line {
t.Fatal("Expected line to match")
}
}
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestArraySelector(t *testing.T) {
}
lines := WebhookProcessBody(config, []byte(json))
expected := fmt.Sprintf("line %v", lineNumber)
if len(lines) != 1 || lines[0] != expected {
if len(lines) != 1 || lines[0].line != expected {
t.Fatalf("Expected: []string{\"%v\"}, Actual: %#v", expected, lines)
}
}
Expand Down
Loading