Skip to content

Commit

Permalink
exporter/prometheusremotewriter: sort Sample by Timestamp to avoid ou…
Browse files Browse the repository at this point in the history
…t of order errors

Ensures that before a prompb.WriteRequest is created, that the
TimeSeries contained have their Sample values sorted chronologically
by Prometheus to avoid out of order errors reported by Prometheus
barfing.

Thanks to @rakyll for a reproducer and for diagnosing the problem, which
helped distill the issue from a very complex setup that required super
expensive Kubernetes clusters with many replicas, but essentially the problem
became more apparently when the number of TimeSeries grew.

It is important to note that the presence of such a bug signifies that
with a large number of replicas are being scraped from, this stalls
scraping and takes a long time which means that targets scraped in a
round-robin fashion experience staleness when many. This might be even
more reasons for setups to adopt a push model as opposed to scrape
endpoints.

Fixes open-telemetry#2315
Fixes open-telemetry/prometheus-interoperability-spec#10
  • Loading branch information
odeke-em committed Apr 14, 2021
1 parent 671bf0a commit bb44498
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 1 deletion.
19 changes: 18 additions & 1 deletion exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int)
sizeOfCurrentBatch = 0
}

// First things first, we need to ensure that the samples per time series are sorted by Timestamp.
// This ensures that we don't produce out of order problems
// See:
// * https://github.com/open-telemetry/wg-prometheus/issues/10
// * https://github.com/open-telemetry/opentelemetry-collector/issues/2315

tsArray = append(tsArray, *v)
sizeOfCurrentBatch += sizeOfSeries
}
Expand Down Expand Up @@ -476,10 +482,21 @@ func addSingleDoubleSummaryDataPoint(pt *otlp.DoubleSummaryDataPoint, metric *ot
}
}

func orderBySampleTimestamp(tsArray []prompb.TimeSeries) []prompb.TimeSeries {
for i := range tsArray {
sL := tsArray[i].Samples
sort.Slice(sL, func(i, j int) bool {
return sL[i].Timestamp < sL[j].Timestamp
})
tsArray[i].Samples = sL
}
return tsArray
}

func convertTimeseriesToRequest(tsArray []prompb.TimeSeries) *prompb.WriteRequest {
// the remote_write endpoint only requires the timeseries.
// otlp defines it's own way to handle metric metadata
return &prompb.WriteRequest{
Timeseries: tsArray,
Timeseries: orderBySampleTimestamp(tsArray),
}
}
113 changes: 113 additions & 0 deletions exporter/prometheusremotewriteexporter/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,3 +367,116 @@ func Test_batchTimeSeries(t *testing.T) {
})
}
}

// Ensure that before a prompb.WriteRequest is created, that the points per TimeSeries
// are sorted by Timestamp value, to prevent Prometheus from barfing when it gets poorly
// sorted values. See issues:
// * https://github.com/open-telemetry/wg-prometheus/issues/10
// * https://github.com/open-telemetry/opentelemetry-collector/issues/2315
func TestEnsureTimeseriesPointsAreSortedByTimestamp(t *testing.T) {
outOfOrder := []prompb.TimeSeries{
{
Samples: []prompb.Sample{
{
Value: 10.11,
Timestamp: 1000,
},
{
Value: 7.81,
Timestamp: 2,
},
{
Value: 987.81,
Timestamp: 1,
},
{
Value: 18.22,
Timestamp: 999,
},
},
},
{
Samples: []prompb.Sample{
{
Value: 99.91,
Timestamp: 5,
},
{
Value: 4.33,
Timestamp: 3,
},
{
Value: 47.81,
Timestamp: 4,
},
{
Value: 18.22,
Timestamp: 8,
},
},
},
}
got := convertTimeseriesToRequest(outOfOrder)

// We must ensure that the resulting Timeseries' sample points are sorted by Timestamp.
want := &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Samples: []prompb.Sample{
{
Value: 987.81,
Timestamp: 1,
},
{
Value: 7.81,
Timestamp: 2,
},
{
Value: 18.22,
Timestamp: 999,
},
{
Value: 10.11,
Timestamp: 1000,
},
},
},
{
Samples: []prompb.Sample{
{
Value: 4.33,
Timestamp: 3,
},
{
Value: 47.81,
Timestamp: 4,
},
{
Value: 99.91,
Timestamp: 5,
},
{
Value: 18.22,
Timestamp: 8,
},
},
},
},
}
assert.Equal(t, got, want)

// For a full sanity/logical check, assert that EVERY
// Sample has a Timestamp bigger than its prior values.
for ti, ts := range got.Timeseries {
for i := range ts.Samples {
si := ts.Samples[i]
for j := 0; j < i; j++ {
sj := ts.Samples[j]
if sj.Timestamp > si.Timestamp {
t.Errorf("Timeseries[%d]: Sample[%d].Timestamp(%d) > Sample[%d].Timestamp(%d)",
ti, j, sj.Timestamp, i, si.Timestamp)
}
}
}
}
}

0 comments on commit bb44498

Please sign in to comment.