Skip to content

Commit

Permalink
Promtail exports metrics on sent and dropped log entries
Browse files Browse the repository at this point in the history
  • Loading branch information
pracucci committed Aug 30, 2019
1 parent ce3b1e4 commit 7b1336b
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 6 deletions.
4 changes: 4 additions & 0 deletions docs/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ Promtail metrics:
- `promtail_read_bytes_total` Number of bytes read.
- `promtail_read_lines_total` Number of lines read.
- `promtail_request_duration_seconds_count` Number of send requests.
- `promtail_encoded_bytes_total` Number of bytes encoded and ready to send.
- `promtail_sent_bytes_total` Number of bytes sent.
- `promtail_dropped_bytes_total` Number of bytes dropped because failed to be sent to the ingester after all retries.
- `promtail_sent_entries_total` Number of log entries sent to the ingester.
- `promtail_dropped_entries_total` Number of log entries dropped because failed to be sent to the ingester after all retries.

Most of these metrics are counters and should continuously increase during normal operations:

Expand Down
33 changes: 29 additions & 4 deletions pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,21 @@ var (
Name: "sent_bytes_total",
Help: "Number of bytes sent.",
}, []string{"host"})
droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "dropped_bytes_total",
Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.",
}, []string{"host"})
sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_entries_total",
Help: "Number of log entries sent to the ingester.",
}, []string{"host"})
droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "dropped_entries_total",
Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.",
}, []string{"host"})
requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "promtail",
Name: "request_duration_seconds",
Expand All @@ -50,6 +65,9 @@ var (
func init() {
prometheus.MustRegister(encodedBytes)
prometheus.MustRegister(sentBytes)
prometheus.MustRegister(droppedBytes)
prometheus.MustRegister(sentEntries)
prometheus.MustRegister(droppedEntries)
prometheus.MustRegister(requestDuration)
}

Expand Down Expand Up @@ -154,7 +172,7 @@ func (c *client) run() {
}

func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
buf, err := encodeBatch(batch)
buf, entriesCount, err := encodeBatch(batch)
if err != nil {
level.Error(c.logger).Log("msg", "error encoding batch", "error", err)
return
Expand All @@ -172,6 +190,7 @@ func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {

if err == nil {
sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
return
}

Expand All @@ -186,22 +205,28 @@ func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {

if err != nil {
level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)
droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
}
}

func encodeBatch(batch map[model.Fingerprint]*logproto.Stream) ([]byte, error) {
func encodeBatch(batch map[model.Fingerprint]*logproto.Stream) ([]byte, int, error) {
req := logproto.PushRequest{
Streams: make([]*logproto.Stream, 0, len(batch)),
}

entriesCount := 0
for _, stream := range batch {
req.Streams = append(req.Streams, stream)
entriesCount += len(stream.Entries)
}

buf, err := proto.Marshal(&req)
if err != nil {
return nil, err
return nil, 0, err
}
buf = snappy.Encode(nil, buf)
return buf, nil
return buf, entriesCount, nil
}

func (c *client) send(ctx context.Context, buf []byte) (int, error) {
Expand Down
83 changes: 81 additions & 2 deletions pkg/promtail/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

Expand All @@ -14,17 +15,21 @@ import (
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/grafana/loki/pkg/logproto"
lokiflag "github.com/grafana/loki/pkg/util/flagext"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
)

func TestClient_Handle(t *testing.T) {
logEntries := []entry{
var (
logEntries = []entry{
{labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(1, 0).UTC(), Line: "line1"}},
{labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(2, 0).UTC(), Line: "line2"}},
{labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(3, 0).UTC(), Line: "line3"}},
}
)

func TestClient_Handle(t *testing.T) {
tests := map[string]struct {
clientBatchSize int
clientBatchWait time.Duration
Expand All @@ -33,6 +38,7 @@ func TestClient_Handle(t *testing.T) {
inputEntries []entry
inputDelay time.Duration
expectedBatches [][]*logproto.Stream
expectedMetrics string
}{
"batch log entries together until the batch size is reached": {
clientBatchSize: 10,
Expand All @@ -48,6 +54,11 @@ func TestClient_Handle(t *testing.T) {
{Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}},
},
},
expectedMetrics: `
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 3.0
`,
},
"batch log entries together until the batch wait time is reached": {
clientBatchSize: 10,
Expand All @@ -64,6 +75,11 @@ func TestClient_Handle(t *testing.T) {
{Labels: "{}", Entries: []logproto.Entry{logEntries[1].Entry}},
},
},
expectedMetrics: `
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 2.0
`,
},
"retry send a batch up to backoff's max retries in case the server responds with a 5xx": {
clientBatchSize: 10,
Expand All @@ -82,6 +98,11 @@ func TestClient_Handle(t *testing.T) {
{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}},
},
},
expectedMetrics: `
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 1.0
`,
},
"do not retry send a batch in case the server responds with a 4xx": {
clientBatchSize: 10,
Expand All @@ -94,11 +115,20 @@ func TestClient_Handle(t *testing.T) {
{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}},
},
},
expectedMetrics: `
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 1.0
`,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
// Reset metrics
sentEntries.Reset()
droppedEntries.Reset()

// Create a buffer channel where we do enqueue received requests
receivedReqsChan := make(chan logproto.PushRequest, 10)

Expand Down Expand Up @@ -156,6 +186,55 @@ func TestClient_Handle(t *testing.T) {
for i, batch := range receivedReqs {
assert.Equal(t, testData.expectedBatches[i], batch.Streams)
}

expectedMetrics := strings.Replace(testData.expectedMetrics, "__HOST__", serverURL.Host, -1)
err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total")
assert.NoError(t, err)
})
}
}

func TestClient_encodeBatch(t *testing.T) {
t.Parallel()

tests := map[string]struct {
inputBatch map[model.Fingerprint]*logproto.Stream
expectedEntriesCount int
}{
"empty batch": {
inputBatch: map[model.Fingerprint]*logproto.Stream{},
expectedEntriesCount: 0,
},
"single stream with single log entry": {
inputBatch: map[model.Fingerprint]*logproto.Stream{
model.Fingerprint(1): {Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}},
},
expectedEntriesCount: 1,
},
"single stream with multiple log entries": {
inputBatch: map[model.Fingerprint]*logproto.Stream{
model.Fingerprint(1): {Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}},
},
expectedEntriesCount: 2,
},
"multiple streams with multiple log entries": {
inputBatch: map[model.Fingerprint]*logproto.Stream{
model.Fingerprint(1): {Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}},
model.Fingerprint(2): {Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}},
},
expectedEntriesCount: 3,
},
}

for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
t.Parallel()

_, entriesCount, err := encodeBatch(testData.inputBatch)
require.NoError(t, err)
assert.Equal(t, testData.expectedEntriesCount, entriesCount)
})
}
}
Expand Down

0 comments on commit 7b1336b

Please sign in to comment.