diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index e6461c676b..c35c3bea41 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -259,28 +259,27 @@ type replica struct { } func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, wreq *prompb.WriteRequest) error { + tLogger := log.With(h.logger, "tenant", tenant) + // This replica value is used to detect cycles in cyclic topologies. // A non-zero value indicates that the request has already been replicated by a previous receive instance. // For almost all users, this is only used in fully connected topologies of IngestorRouter instances. // For acyclic topologies that use RouterOnly and IngestorOnly instances, this causes issues when replicating data. - // See discussion in: https://github.com/thanos-io/thanos/issues/4359 + // See discussion in: https://github.com/thanos-io/thanos/issues/4359. if h.receiverMode == RouterOnly || h.receiverMode == IngestorOnly { rep = 0 } // The replica value in the header is one-indexed, thus we need >. if rep > h.options.ReplicationFactor { - level.Error(h.logger).Log("err", errBadReplica, "msg", "write request rejected", + level.Error(tLogger).Log("err", errBadReplica, "msg", "write request rejected", "request_replica", rep, "replication_factor", h.options.ReplicationFactor) return errBadReplica } - r := replica{ - n: rep, - replicated: rep != 0, - } + r := replica{n: rep, replicated: rep != 0} - // On the wire, format is 1-indexed and in-code is 0-indexed so we decrement the value if it was already replicated. + // On the wire, format is 1-indexed and in-code is 0-indexed, so we decrement the value if it was already replicated. if r.replicated { r.n-- } @@ -295,6 +294,13 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { span, ctx := tracing.StartSpan(r.Context(), "receive_http") defer span.Finish() + tenant := r.Header.Get(h.options.TenantHeader) + if tenant == "" { + tenant = h.options.DefaultTenantID + } + + tLogger := log.With(h.logger, "tenant", tenant) + // ioutil.ReadAll dynamically adjust the byte slice for read data, starting from 512B. // Since this is receive hot path, grow upfront saving allocations and CPU time. compressed := bytes.Buffer{} @@ -311,7 +317,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { reqBuf, err := s2.Decode(nil, compressed.Bytes()) if err != nil { - level.Error(h.logger).Log("msg", "snappy decode error", "err", err) + level.Error(tLogger).Log("msg", "snappy decode error", "err", err) http.Error(w, errors.Wrap(err, "snappy decode error").Error(), http.StatusBadRequest) return } @@ -334,21 +340,22 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { } } - tenant := r.Header.Get(h.options.TenantHeader) - if tenant == "" { - tenant = h.options.DefaultTenantID - } - - // TODO(yeya24): handle remote write metadata. - // exit early if the request contained no data + // Exit early if the request contained no data. We don't support metadata yet. We also cannot fail here, because + // this would mean lack of forward compatibility for remote write proto. if len(wreq.Timeseries) == 0 { - level.Debug(h.logger).Log("msg", "empty timeseries from client", "tenant", tenant) + // TODO(yeya24): Handle remote write metadata. + if len(wreq.Metadata) > 0 { + // TODO(bwplotka): Do we need this error message? + level.Debug(tLogger).Log("msg", "only metadata from client; metadata ingestion not supported; skipping") + return + } + level.Debug(tLogger).Log("msg", "empty remote write request; client bug or newer remote write protocol used?; skipping") return } err = h.handleRequest(ctx, rep, tenant, &wreq) if err != nil { - level.Debug(h.logger).Log("msg", "failed to handle request", "err", err) + level.Debug(tLogger).Log("msg", "failed to handle request", "err", err) } switch determineWriteErrorCause(err, 1) { @@ -363,7 +370,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { case errBadReplica: http.Error(w, err.Error(), http.StatusBadRequest) default: - level.Error(h.logger).Log("err", err, "msg", "internal server error") + level.Error(tLogger).Log("err", err, "msg", "internal server error") http.Error(w, err.Error(), http.StatusInternalServerError) } } @@ -436,9 +443,13 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma } }() - logTags := []interface{}{"tenant", tenant} - if id, ok := middleware.RequestIDFromContext(pctx); ok { - logTags = append(logTags, "request-id", id) + var tLogger log.Logger + { + logTags := []interface{}{"tenant", tenant} + if id, ok := middleware.RequestIDFromContext(pctx); ok { + logTags = append(logTags, "request-id", id) + } + tLogger = log.With(h.logger, logTags) } ec := make(chan error) @@ -488,7 +499,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma if err != nil { // When a MultiError is added to another MultiError, the error slices are concatenated, not nested. // To avoid breaking the counting logic, we need to flatten the error. - level.Debug(h.logger).Log(append(logTags, "msg", "local tsdb write failed", "err", err.Error())) + level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error()) ec <- errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint) return } @@ -551,7 +562,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma b.attempt++ dur := h.expBackoff.ForAttempt(b.attempt) b.nextAllowed = time.Now().Add(dur) - level.Debug(h.logger).Log(append(logTags, "msg", "target unavailable backing off", "for", dur)) + level.Debug(tLogger).Log("msg", "target unavailable backing off", "for", dur) } else { h.peerStates[endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))} } @@ -580,7 +591,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma go func() { for err := range ec { if err != nil { - level.Debug(h.logger).Log(append(logTags, "msg", "request failed, but not needed to achieve quorum", "err", err)) + level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", err) } } }() diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index decc5c68f5..90bb2897c4 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -40,6 +40,8 @@ func NewWriter(logger log.Logger, multiTSDB TenantStorage) *Writer { } func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteRequest) error { + tLogger := log.With(r.logger, "tenant", tenantID) + var ( numOutOfOrder = 0 numDuplicates = 0 @@ -85,13 +87,13 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR switch err { case storage.ErrOutOfOrderSample: numOutOfOrder++ - level.Debug(r.logger).Log("msg", "Out of order sample", "lset", lset, "sample", s) + level.Debug(tLogger).Log("msg", "Out of order sample", "lset", lset, "value", s.Value, "timestamp", s.Timestamp) case storage.ErrDuplicateSampleForTimestamp: numDuplicates++ - level.Debug(r.logger).Log("msg", "Duplicate sample for timestamp", "lset", lset, "sample", s) + level.Debug(tLogger).Log("msg", "Duplicate sample for timestamp", "lset", lset, "value", s.Value, "timestamp", s.Timestamp) case storage.ErrOutOfBounds: numOutOfBounds++ - level.Debug(r.logger).Log("msg", "Out of bounds metric", "lset", lset, "sample", s) + level.Debug(tLogger).Log("msg", "Out of bounds metric", "lset", lset, "value", s.Value, "timestamp", s.Timestamp) } } @@ -100,7 +102,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR if ref != 0 && len(t.Exemplars) > 0 { for _, ex := range t.Exemplars { exLset := labelpb.ZLabelsToPromLabels(ex.Labels) - logger := log.With(r.logger, "exemplarLset", exLset, "exemplar", ex.String()) + logger := log.With(tLogger, "exemplarLset", exLset, "exemplar", ex.String()) _, err = app.AppendExemplar(ref, lset, exemplar.Exemplar{ Labels: exLset, @@ -128,27 +130,27 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR } if numOutOfOrder > 0 { - level.Warn(r.logger).Log("msg", "Error on ingesting out-of-order samples", "numDropped", numOutOfOrder) + level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order samples", "numDropped", numOutOfOrder) errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, "add %d samples", numOutOfOrder)) } if numDuplicates > 0 { - level.Warn(r.logger).Log("msg", "Error on ingesting samples with different value but same timestamp", "numDropped", numDuplicates) + level.Warn(tLogger).Log("msg", "Error on ingesting samples with different value but same timestamp", "numDropped", numDuplicates) errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "add %d samples", numDuplicates)) } if numOutOfBounds > 0 { - level.Warn(r.logger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "numDropped", numOutOfBounds) + level.Warn(tLogger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "numDropped", numOutOfBounds) errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "add %d samples", numOutOfBounds)) } if numExemplarsOutOfOrder > 0 { - level.Warn(r.logger).Log("msg", "Error on ingesting out-of-order exemplars", "numDropped", numExemplarsOutOfOrder) + level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order exemplars", "numDropped", numExemplarsOutOfOrder) errs.Add(errors.Wrapf(storage.ErrOutOfOrderExemplar, "add %d exemplars", numExemplarsOutOfOrder)) } if numExemplarsDuplicate > 0 { - level.Warn(r.logger).Log("msg", "Error on ingesting duplicate exemplars", "numDropped", numExemplarsDuplicate) + level.Warn(tLogger).Log("msg", "Error on ingesting duplicate exemplars", "numDropped", numExemplarsDuplicate) errs.Add(errors.Wrapf(storage.ErrDuplicateExemplar, "add %d exemplars", numExemplarsDuplicate)) } if numExemplarsLabelLength > 0 { - level.Warn(r.logger).Log("msg", "Error on ingesting exemplars with label length exceeding maximum limit", "numDropped", numExemplarsLabelLength) + level.Warn(tLogger).Log("msg", "Error on ingesting exemplars with label length exceeding maximum limit", "numDropped", numExemplarsLabelLength) errs.Add(errors.Wrapf(storage.ErrExemplarLabelLength, "add %d exemplars", numExemplarsLabelLength)) }