Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promtail: Add tenant label to client drop metrics and logs #7593

Merged
merged 3 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Check the history of the branch FIXME.
#### Promtail

##### Enhancements

* [7593](https://github.com/grafana/loki/pull/7593) **chodges15**: Promtail: Add tenant label to client drop metrics and logs
* [7101](https://github.com/grafana/loki/pull/7101) **liguozhong**: Promtail: Add support for max stream limit.
* [7247](https://github.com/grafana/loki/pull/7247) **liguozhong**: Add config reload endpoint / signal to promtail.
* [6708](https://github.com/grafana/loki/pull/6708) **DylanGuedes**: Add compressed files support to Promtail.
Expand Down
52 changes: 32 additions & 20 deletions clients/pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,22 @@ const (
LatencyLabel = "filename"
HostLabel = "host"
ClientLabel = "client"
TenantLabel = "tenant"
)

var UserAgent = fmt.Sprintf("promtail/%s", build.Version)

type Metrics struct {
encodedBytes *prometheus.CounterVec
sentBytes *prometheus.CounterVec
droppedBytes *prometheus.CounterVec
sentEntries *prometheus.CounterVec
droppedEntries *prometheus.CounterVec
requestDuration *prometheus.HistogramVec
batchRetries *prometheus.CounterVec
countersWithHost []*prometheus.CounterVec
streamLag *prometheus.GaugeVec
encodedBytes *prometheus.CounterVec
sentBytes *prometheus.CounterVec
droppedBytes *prometheus.CounterVec
sentEntries *prometheus.CounterVec
droppedEntries *prometheus.CounterVec
requestDuration *prometheus.HistogramVec
batchRetries *prometheus.CounterVec
countersWithHost []*prometheus.CounterVec
countersWithTenant []*prometheus.CounterVec
streamLag *prometheus.GaugeVec
}

func NewMetrics(reg prometheus.Registerer, streamLagLabels []string) *Metrics {
Expand All @@ -71,7 +73,7 @@ func NewMetrics(reg prometheus.Registerer, streamLagLabels []string) *Metrics {
Namespace: "promtail",
Name: "dropped_bytes_total",
Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.",
}, []string{HostLabel})
}, []string{HostLabel, TenantLabel})
m.sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_entries_total",
Expand All @@ -81,7 +83,7 @@ func NewMetrics(reg prometheus.Registerer, streamLagLabels []string) *Metrics {
Namespace: "promtail",
Name: "dropped_entries_total",
Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.",
}, []string{HostLabel})
}, []string{HostLabel, TenantLabel})
m.requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "promtail",
Name: "request_duration_seconds",
Expand All @@ -91,10 +93,14 @@ func NewMetrics(reg prometheus.Registerer, streamLagLabels []string) *Metrics {
Namespace: "promtail",
Name: "batch_retries_total",
Help: "Number of times batches has had to be retried.",
}, []string{HostLabel})
}, []string{HostLabel, TenantLabel})

m.countersWithHost = []*prometheus.CounterVec{
m.encodedBytes, m.sentBytes, m.droppedBytes, m.sentEntries, m.droppedEntries, m.batchRetries,
m.encodedBytes, m.sentBytes, m.sentEntries,
}

m.countersWithTenant = []*prometheus.CounterVec{
m.droppedBytes, m.droppedEntries, m.batchRetries,
}

streamLagLabelsMerged := []string{HostLabel, ClientLabel}
Expand Down Expand Up @@ -270,6 +276,11 @@ func (c *client) run() {
// If the batch doesn't exist yet, we create a new one with the entry
if !ok {
batches[tenantID] = newBatch(c.maxStreams, e)
// Initialize counters to 0 so the metrics are exported before the first
// occurrence of incrementing to avoid missing metrics.
for _, counter := range c.metrics.countersWithTenant {
counter.WithLabelValues(c.cfg.URL.Host, tenantID).Add(0)
}
break
}

Expand All @@ -285,8 +296,9 @@ func (c *client) run() {
// The max size of the batch isn't reached, so we can add the entry
err := batch.add(e)
if err != nil {
level.Error(c.logger).Log("msg", "batch add err", "error", err)
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host).Inc()
level.Error(c.logger).Log("msg", "batch add err", "tenant", tenantID, "error", err)
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID).Add(float64(len(e.Line)))
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID).Inc()
return
}
case <-maxWaitCheck.C:
Expand Down Expand Up @@ -376,8 +388,8 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
break
}

level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err)
c.metrics.batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()
level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "tenant", tenantID, "error", err)
c.metrics.batchRetries.WithLabelValues(c.cfg.URL.Host, tenantID).Inc()
backoff.Wait()

// Make sure it sends at least once before checking for retry.
Expand All @@ -387,9 +399,9 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
}

if err != nil {
level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "tenant", tenantID, "error", err)
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID).Add(bufBytes)
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID).Add(float64(entriesCount))
}
}

Expand Down
20 changes: 11 additions & 9 deletions clients/pkg/promtail/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestClient_Handle(t *testing.T) {
promtail_sent_entries_total{host="__HOST__"} 3.0
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 0
promtail_dropped_entries_total{host="__HOST__", tenant=""} 0
`,
},
"batch log entries together until the batch wait time is reached": {
Expand All @@ -101,7 +101,7 @@ func TestClient_Handle(t *testing.T) {
promtail_sent_entries_total{host="__HOST__"} 2.0
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 0
promtail_dropped_entries_total{host="__HOST__", tenant=""} 0
`,
},
"retry send a batch up to backoff's max retries in case the server responds with a 5xx": {
Expand All @@ -127,7 +127,7 @@ func TestClient_Handle(t *testing.T) {
expectedMetrics: `
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 1.0
promtail_dropped_entries_total{host="__HOST__", tenant=""} 1.0
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 0
Expand All @@ -148,7 +148,7 @@ func TestClient_Handle(t *testing.T) {
expectedMetrics: `
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 1.0
promtail_dropped_entries_total{host="__HOST__", tenant=""} 1.0
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 0
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestClient_Handle(t *testing.T) {
expectedMetrics: `
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 1.0
promtail_dropped_entries_total{host="__HOST__", tenant=""} 1.0
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 0
Expand All @@ -202,7 +202,7 @@ func TestClient_Handle(t *testing.T) {
promtail_sent_entries_total{host="__HOST__"} 2.0
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 0
promtail_dropped_entries_total{host="__HOST__", tenant="tenant-default"} 0
`,
},
"batch log entries together honoring the tenant ID overridden while processing the pipeline stages": {
Expand Down Expand Up @@ -232,7 +232,9 @@ func TestClient_Handle(t *testing.T) {
promtail_sent_entries_total{host="__HOST__"} 4.0
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 0
promtail_dropped_entries_total{host="__HOST__", tenant="tenant-1"} 0
promtail_dropped_entries_total{host="__HOST__", tenant="tenant-2"} 0
promtail_dropped_entries_total{host="__HOST__", tenant="tenant-default"} 0
`,
},
}
Expand Down Expand Up @@ -343,7 +345,7 @@ func TestClient_StopNow(t *testing.T) {
promtail_sent_entries_total{host="__HOST__"} 3.0
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 0
promtail_dropped_entries_total{host="__HOST__", tenant=""} 0
`,
},
{
Expand All @@ -362,7 +364,7 @@ func TestClient_StopNow(t *testing.T) {
expectedMetrics: `
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 1.0
promtail_dropped_entries_total{host="__HOST__", tenant=""} 1.0
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 0
Expand Down