Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

promtail: Add max-line-size-truncate #8233

Merged
merged 4 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* [7973](https://github.com/grafana/loki/pull/7973) **chodges15**: Add configuration to drop rate limited batches in Loki client and new metric label for drop reason.
* [8153](https://github.com/grafana/loki/pull/8153) **kavirajk**: promtail: Add `max-line-size` limit to drop on client side
* [8096](https://github.com/grafana/loki/pull/8096) **kavirajk**: doc(promtail): Doc about how log rotate works with promtail
* [8233](https://github.com/grafana/loki/pull/8233) **nicoche**: promtail: Add `max-line-size-truncate` limit to truncate too long lines on client side


##### Enhancements
Expand Down
2 changes: 1 addition & 1 deletion clients/cmd/docker-driver/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func New(logCtx logger.Info, logger log.Logger) (logger.Logger, error) {
return nil, err
}
m := client.NewMetrics(prometheus.DefaultRegisterer)
c, err := client.New(m, cfg.clientConfig, 0, 0, logger)
c, err := client.New(m, cfg.clientConfig, 0, 0, false, logger)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion clients/cmd/fluent-bit/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ func NewClient(cfg *config, logger log.Logger, metrics *client.Metrics) (client.
if cfg.bufferConfig.buffer {
return NewBuffer(cfg, logger, metrics)
}
return client.New(metrics, cfg.clientConfig, 0, 0, logger)
return client.New(metrics, cfg.clientConfig, 0, 0, false, logger)
}
2 changes: 1 addition & 1 deletion clients/cmd/fluent-bit/dque.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func newDque(cfg *config, logger log.Logger, metrics *client.Metrics) (client.Cl
_ = q.queue.TurboOn()
}

q.loki, err = client.New(metrics, cfg.clientConfig, 0, 0, logger)
q.loki, err = client.New(metrics, cfg.clientConfig, 0, 0, false, logger)
if err != nil {
return nil, err
}
Expand Down
99 changes: 61 additions & 38 deletions clients/pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ const (
// pipeline stages
ReservedLabelTenantID = "__tenant_id__"

LatencyLabel = "filename"
HostLabel = "host"
ClientLabel = "client"
TenantLabel = "tenant"
DropReasonLabel = "reason"

DropReasonGeneric = "ingester_error"
DropReasonRateLimited = "rate_limited"
DropReasonStreamLimited = "stream_limited"
DropReasongMaxLineSizeLimited = "max_line_size_limited"
LatencyLabel = "filename"
HostLabel = "host"
ClientLabel = "client"
TenantLabel = "tenant"
ReasonLabel = "reason"

ReasonGeneric = "ingester_error"
ReasonRateLimited = "rate_limited"
ReasonStreamLimited = "stream_limited"
ReasonLineTooLong = "line_too_long"
)

var DropReasons = []string{DropReasonGeneric, DropReasonRateLimited, DropReasonStreamLimited}
var Reasons = []string{ReasonGeneric, ReasonRateLimited, ReasonStreamLimited, ReasonLineTooLong}

var UserAgent = fmt.Sprintf("promtail/%s", build.Version)

Expand All @@ -56,6 +56,8 @@ type Metrics struct {
droppedBytes *prometheus.CounterVec
sentEntries *prometheus.CounterVec
droppedEntries *prometheus.CounterVec
mutatedEntries *prometheus.CounterVec
mutatedBytes *prometheus.CounterVec
requestDuration *prometheus.HistogramVec
batchRetries *prometheus.CounterVec
countersWithHost []*prometheus.CounterVec
Expand All @@ -80,7 +82,7 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
Namespace: "promtail",
Name: "dropped_bytes_total",
Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.",
}, []string{HostLabel, TenantLabel, DropReasonLabel})
}, []string{HostLabel, TenantLabel, ReasonLabel})
m.sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_entries_total",
Expand All @@ -90,7 +92,17 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
Namespace: "promtail",
Name: "dropped_entries_total",
Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.",
}, []string{HostLabel, TenantLabel, DropReasonLabel})
}, []string{HostLabel, TenantLabel, ReasonLabel})
m.mutatedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
nicoche marked this conversation as resolved.
Show resolved Hide resolved
Namespace: "promtail",
Name: "mutated_entries_total",
Help: "The total number of log entries that have been mutated.",
}, []string{HostLabel, TenantLabel, ReasonLabel})
m.mutatedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "mutated_bytes_total",
Help: "The total number of bytes that have been mutated.",
}, []string{HostLabel, TenantLabel, ReasonLabel})
m.requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "promtail",
Name: "request_duration_seconds",
Expand All @@ -111,7 +123,7 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
}

m.countersWithHostTenantReason = []*prometheus.CounterVec{
m.droppedBytes, m.droppedEntries,
m.droppedBytes, m.droppedEntries, m.mutatedEntries, m.mutatedBytes,
}

if reg != nil {
Expand All @@ -120,6 +132,8 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
m.droppedBytes = mustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec)
m.sentEntries = mustRegisterOrGet(reg, m.sentEntries).(*prometheus.CounterVec)
m.droppedEntries = mustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec)
m.mutatedEntries = mustRegisterOrGet(reg, m.mutatedEntries).(*prometheus.CounterVec)
m.mutatedBytes = mustRegisterOrGet(reg, m.mutatedBytes).(*prometheus.CounterVec)
m.requestDuration = mustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec)
m.batchRetries = mustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec)
}
Expand Down Expand Up @@ -160,24 +174,25 @@ type client struct {
externalLabels model.LabelSet

// ctx is used in any upstream calls from the `client`.
ctx context.Context
cancel context.CancelFunc
maxStreams int
maxLineSize int
ctx context.Context
cancel context.CancelFunc
maxStreams int
maxLineSize int
maxLineSizeTruncate bool
}

// Tripperware can wrap a roundtripper.
type Tripperware func(http.RoundTripper) http.RoundTripper

// New makes a new Client.
func New(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, logger log.Logger) (Client, error) {
func New(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger) (Client, error) {
if cfg.StreamLagLabels.String() != "" {
return nil, fmt.Errorf("client config stream_lag_labels is deprecated and the associated metric has been removed, stream_lag_labels: %+v", cfg.StreamLagLabels.String())
}
return newClient(metrics, cfg, maxStreams, maxLineSize, logger)
return newClient(metrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger)
}

func newClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, logger log.Logger) (*client, error) {
func newClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger) (*client, error) {

if cfg.URL.URL == nil {
return nil, errors.New("client needs target URL")
Expand All @@ -192,11 +207,12 @@ func newClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, logger
metrics: metrics,
name: asSha256(cfg),

externalLabels: cfg.ExternalLabels.LabelSet,
ctx: ctx,
cancel: cancel,
maxStreams: maxStreams,
maxLineSize: maxLineSize,
externalLabels: cfg.ExternalLabels.LabelSet,
ctx: ctx,
cancel: cancel,
maxStreams: maxStreams,
maxLineSize: maxLineSize,
maxLineSizeTruncate: maxLineSizeTruncate,
}
if cfg.Name != "" {
c.name = cfg.Name
Expand Down Expand Up @@ -226,8 +242,8 @@ func newClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, logger
}

// NewWithTripperware creates a new Loki client with a custom tripperware.
func NewWithTripperware(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, logger log.Logger, tp Tripperware) (Client, error) {
c, err := newClient(metrics, cfg, maxStreams, maxLineSize, logger)
func NewWithTripperware(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, tp Tripperware) (Client, error) {
c, err := newClient(metrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger)
if err != nil {
return nil, err
}
Expand All @@ -243,7 +259,7 @@ func (c *client) initBatchMetrics(tenantID string) {
// Initialize counters to 0 so the metrics are exported before the first
// occurrence of incrementing to avoid missing metrics.
for _, counter := range c.metrics.countersWithHostTenantReason {
for _, reason := range DropReasons {
for _, reason := range Reasons {
counter.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(0)
}
}
Expand Down Expand Up @@ -289,10 +305,17 @@ func (c *client) run() {

e, tenantID := c.processEntry(e)

// drop the entry because its length is greater than maxLineSize. maxLineSize == 0 means disabled.
// Either drop or mutate the log entry because its length is greater than maxLineSize. maxLineSize == 0 means disabled.
if c.maxLineSize != 0 && len(e.Line) > c.maxLineSize {
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, DropReasongMaxLineSizeLimited).Inc()
break
if !c.maxLineSizeTruncate {
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Inc()
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Add(float64(len(e.Line)))
break
}

c.metrics.mutatedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Inc()
c.metrics.mutatedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Add(float64(len(e.Line) - c.maxLineSize))
e.Line = e.Line[:c.maxLineSize]
}

batch, ok := batches[tenantID]
Expand All @@ -317,9 +340,9 @@ func (c *client) run() {
err := batch.add(e)
if err != nil {
level.Error(c.logger).Log("msg", "batch add err", "tenant", tenantID, "error", err)
reason := DropReasonGeneric
reason := ReasonGeneric
if err.Error() == errMaxStreamsLimitExceeded {
reason = DropReasonStreamLimited
reason = ReasonStreamLimited
}
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(float64(len(e.Line)))
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Inc()
Expand Down Expand Up @@ -376,8 +399,8 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
// Immediately drop rate limited batches to avoid HOL blocking for other tenants not experiencing throttling
if c.cfg.DropRateLimitedBatches && batchIsRateLimited(status) {
level.Warn(c.logger).Log("msg", "dropping batch due to rate limiting applied at ingester")
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, DropReasonRateLimited).Add(bufBytes)
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, DropReasonRateLimited).Add(float64(entriesCount))
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonRateLimited).Add(bufBytes)
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonRateLimited).Add(float64(entriesCount))
return
}

Expand Down Expand Up @@ -407,9 +430,9 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "tenant", tenantID, "error", err)
// If the reason for the last retry error was rate limiting, count the drops as such, even if the previous errors
// were for a different reason
dropReason := DropReasonGeneric
dropReason := ReasonGeneric
if batchIsRateLimited(status) {
dropReason = DropReasonRateLimited
dropReason = ReasonRateLimited
}
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, dropReason).Add(bufBytes)
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, dropReason).Add(float64(entriesCount))
Expand Down
Loading