Skip to content

Commit

Permalink
Filebeat Redis prospector type
Browse files Browse the repository at this point in the history
This PR adds a new prospector type which reads the slowlog from redis. This slowlog is not in a file but in memory in redis. Because of this filebeat connects to redis and reads out the slowlog. It is important to note that the slow log size is limited in redis that is why after fetching the events from the slowlog filebeat resets the slow log.

Example event looks as following:

```
{
  "@timestamp": "2017-05-16T06:27:17.000Z",
  "beat": {
    "hostname": "ruflin",
    "name": "ruflin",
    "read_timestamp": "2017-05-16T06:27:19.275Z",
    "version": "6.0.0-alpha2"
  },
  "message": "SET hello world",
  "redis": {
    "slowlog": {
      "args": [
        "world"
      ],
      "cmd": "SET",
      "duration": {
        "us": 11
      },
      "id": 38,
      "key": "hello"
    }
  }
}
```

All args are combined in the "message" field output for easy retrieval.
  • Loading branch information
ruflin committed May 26, 2017
1 parent 2e9d1f7 commit e3e6c2f
Show file tree
Hide file tree
Showing 14 changed files with 417 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d
- Add the option to write the generated Elasticsearch mapping template into a file. {pull}4323[4323]

*Filebeat*
- Add experimental Redis slow log prospector type. {pull}4180[4180]

*Heartbeat*

Expand Down
21 changes: 21 additions & 0 deletions filebeat/_meta/common.full.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,27 @@ filebeat.prospectors:
# Configuration to use stdin input
#- type: stdin

#------------------------- Redis slowlog prospector ---------------------------
# Experimental: Config options for the redis slow log prospector
#- input_type: redis
#hosts: ["localhost:6379"]
#username:
#password:
#enabled: false
#scan_frequency: 10s

# Timeout after which time the prospector should return an error
#timeout: 1s

# Network type to be used for redis connection. Default: tcp
#network: tcp

# Max number of concurrent connections. Default: 10
#maxconn: 10

# Redis AUTH password. Empty by default.
#password: foobared

#========================= Filebeat global options ============================

# Event count spool threshold - forces network flush if exceeded
Expand Down
5 changes: 5 additions & 0 deletions filebeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ services:
- proxy_dep
env_file:
- ${PWD}/build/test.env
- ${PWD}/prospector/redis/_meta/env
working_dir: /go/src/github.com/elastic/beats/filebeat
volumes:
- ${PWD}/..:/go/src/github.com/elastic/beats/
Expand All @@ -17,8 +18,12 @@ services:
image: busybox
depends_on:
elasticsearch: { condition: service_healthy }
redis: { condition: service_healthy }

elasticsearch:
extends:
file: ../testing/environments/${TESTING_ENVIRONMENT}.yml
service: elasticsearch

redis:
build: ${PWD}/prospector/redis/_meta
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ One of the following input types:

* log: Reads every line of the log file (default)
* stdin: Reads the standard in
* redis: Reads slow log entries from redis (experimental)

The value that you specify here is used as the `type` for each event published to Logstash and Elasticsearch.

Expand Down
21 changes: 21 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,27 @@ filebeat.prospectors:
# Configuration to use stdin input
#- type: stdin

#------------------------- Redis slowlog prospector ---------------------------
# Experimental: Config options for the redis slow log prospector
#- input_type: redis
#hosts: ["localhost:6379"]
#username:
#password:
#enabled: false
#scan_frequency: 10s

# Timeout after which time the prospector should return an error
#timeout: 1s

# Network type to be used for redis connection. Default: tcp
#network: tcp

# Max number of concurrent connections. Default: 10
#maxconn: 10

# Redis AUTH password. Empty by default.
#password: foobared

#========================= Filebeat global options ============================

# Event count spool threshold - forces network flush if exceeded
Expand Down
4 changes: 3 additions & 1 deletion filebeat/harvester/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import "github.com/elastic/beats/libbeat/common/match"
const (
LogType = "log"
StdinType = "stdin"
RedisType = "redis"
)

// ValidType is a list of all valid input types
// ValidType of valid input types
var ValidType = map[string]struct{}{
StdinType: {},
LogType: {},
RedisType: {},
}

// MatchAny checks if the text matches any of the regular expressions
Expand Down
3 changes: 3 additions & 0 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/prospector/log"
"github.com/elastic/beats/filebeat/prospector/redis"
"github.com/elastic/beats/filebeat/prospector/stdin"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -72,6 +73,8 @@ func (p *Prospector) initProspectorer(outlet channel.Outleter, states []file.Sta
switch p.config.Type {
case harvester.StdinType:
prospectorer, err = stdin.NewProspector(config, outlet)
case harvester.RedisType:
prospectorer, err = redis.NewProspector(config, outlet)
case harvester.LogType:
prospectorer, err = log.NewProspector(config, states, outlet, p.done)
default:
Expand Down
2 changes: 2 additions & 0 deletions filebeat/prospector/redis/_meta/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FROM redis:3.2.4-alpine
HEALTHCHECK CMD nc -z localhost 6379
2 changes: 2 additions & 0 deletions filebeat/prospector/redis/_meta/env
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
REDIS_HOST=redis
REDIS_PORT=6379
27 changes: 27 additions & 0 deletions filebeat/prospector/redis/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package redis

import (
"time"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester"
)

var defaultConfig = config{

ForwarderConfig: harvester.ForwarderConfig{
Type: cfg.DefaultType,
},
Network: "tcp",
MaxConn: 10,
Password: "",
}

type config struct {
harvester.ForwarderConfig `config:",inline"`
Hosts []string `config:"hosts" validate:"required"`
IdleTimeout time.Duration `config:"idle_timeout"`
Network string `config:"network"`
MaxConn int `config:"maxconn" validate:"min=1"`
Password string `config:"password"`
}
14 changes: 14 additions & 0 deletions filebeat/prospector/redis/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Package redis package contains prospector and harvester to read the redis slow log
//
// The redis slow log is stored in memory. The slow log can be activate on the redis command line as following:
//
// CONFIG SET slowlog-log-slower-than 2000000
//
// This sets the value of the slow log to 2000000 micro seconds (2s). All queries taking longer will be reported.
//
// As the slow log is in memory, it can be configured how many items it consists:
//
// CONFIG SET slowlog-max-len 200
//
// This sets the size of the slow log to 200 entries. In case the slow log is full, older entries are dropped.
package redis
155 changes: 155 additions & 0 deletions filebeat/prospector/redis/harvester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package redis

import (
"fmt"
"time"

"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"

"strings"

"github.com/elastic/beats/filebeat/harvester"
rd "github.com/garyburd/redigo/redis"
"github.com/satori/go.uuid"
)

// Harvester contains all redis harvester data
type Harvester struct {
id uuid.UUID
done chan struct{}
conn rd.Conn
forwarder *harvester.Forwarder
}

// log contains all data related to one slowlog entry
//
// The data is in the following format:
// 1) (integer) 13
// 2) (integer) 1309448128
// 3) (integer) 30
// 4) 1) "slowlog"
// 2) "get"
// 3) "100"
//
type log struct {
id int64
timestamp int64
duration int
cmd string
key string
args []string
}

// NewHarvester creates a new harvester with the given connection
func NewHarvester(conn rd.Conn) *Harvester {
return &Harvester{
id: uuid.NewV4(),
done: make(chan struct{}),
conn: conn,
}
}

// Run starts a new redis harvester
func (h *Harvester) Run() error {
defer h.conn.Close()

select {
case <-h.done:
return nil
default:
}
// Writes Slowlog get and slowlog reset both to the buffer so they are executed together
h.conn.Send("SLOWLOG", "GET")
h.conn.Send("SLOWLOG", "RESET")

// Flush the buffer to execute both commands and receive the reply from SLOWLOG GET
h.conn.Flush()

// Receives first reply from redis which is the one from GET
logs, err := rd.Values(h.conn.Receive())
if err != nil {
return fmt.Errorf("error receiving slowlog data: %s", err)
}

// Read reply from RESET
_, err = h.conn.Receive()
if err != nil {
return fmt.Errorf("error receiving reset data: %s", err)
}

for _, item := range logs {
// Stopping here means some of the slowlog events are lost!
select {
case <-h.done:
return nil
default:
}
entry, err := rd.Values(item, nil)
if err != nil {
logp.Err("Error loading slowlog values: %s", err)
continue
}

var log log
var args []string
rd.Scan(entry, &log.id, &log.timestamp, &log.duration, &args)

// This splits up the args into cmd, key, args.
argsLen := len(args)
if argsLen > 0 {
log.cmd = args[0]
}
if argsLen > 1 {
log.key = args[1]
}

// This could contain confidential data, processors should be used to drop it if needed
if argsLen > 2 {
log.args = args[2:]
}

data := util.NewData()
subEvent := common.MapStr{
"id": log.id,
"cmd": log.cmd,
"key": log.key,
"duration": common.MapStr{
"us": log.duration,
},
}

if log.args != nil {
subEvent["args"] = log.args

}

data.Event = common.MapStr{
"@timestamp": common.Time(time.Unix(log.timestamp, 0).UTC()),
"message": strings.Join(args, " "),
"redis": common.MapStr{
"slowlog": subEvent,
},
"beat": common.MapStr{
"read_timestamp": common.Time(time.Now()),
},
"prospector": common.MapStr{
"type": "redis",
},
}

h.forwarder.Send(data)
}
return nil
}

// Stop stopps the harvester
func (h *Harvester) Stop() {
close(h.done)
}

// ID returns the unique harvester ID
func (h *Harvester) ID() uuid.UUID {
return h.id
}
Loading

0 comments on commit e3e6c2f

Please sign in to comment.