Skip to content

Commit

Permalink
Fix a few readability issues on the prometheusremotewriteexporter (#2279
Browse files Browse the repository at this point in the history
)

A few changes:
- Removal of arbitrary whitespace
- Fixing the package godoc
- Use of canonical variable names such as req/resp

And more.
  • Loading branch information
rakyll authored Dec 11, 2020
1 parent e1a6806 commit 4cff93d
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 33 deletions.
7 changes: 2 additions & 5 deletions exporter/prometheusremotewriteexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,9 @@ func Test_loadConfig(t *testing.T) {
},
Insecure: false,
},
ReadBufferSize: 0,

ReadBufferSize: 0,
WriteBufferSize: 512 * 1024,

Timeout: 5 * time.Second,

Timeout: 5 * time.Second,
Headers: map[string]string{
"prometheus-remote-write-version": "0.1.0",
"x-scope-orgid": "234"},
Expand Down
39 changes: 18 additions & 21 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Note: implementation for this class is in a separate PR
// Package prometheusremotewriteexporter implements an exporter that sends Prometheus remote write requests.
package prometheusremotewriteexporter

import (
Expand Down Expand Up @@ -43,7 +43,7 @@ const (
maxBatchByteSize = 3000000
)

// PrwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint
// PrwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
type PrwExporter struct {
namespace string
externalLabels map[string]string
Expand All @@ -56,7 +56,6 @@ type PrwExporter struct {
// NewPrwExporter initializes a new PrwExporter instance and sets fields accordingly.
// client parameter cannot be nil.
func NewPrwExporter(namespace string, endpoint string, client *http.Client, externalLabels map[string]string) (*PrwExporter, error) {

if client == nil {
return nil, errors.New("http client cannot be nil")
}
Expand Down Expand Up @@ -95,6 +94,7 @@ func (prwe *PrwExporter) Shutdown(context.Context) error {
func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int, error) {
prwe.wg.Add(1)
defer prwe.wg.Done()

select {
case <-prwe.closeChan:
return md.MetricCount(), errors.New("shutdown has been called")
Expand Down Expand Up @@ -185,7 +185,6 @@ func validateAndSanitizeExternalLabels(externalLabels map[string]string) (map[st
// its corresponding TimeSeries in tsMap.
// tsMap and metric cannot be nil, and metric must have a non-nil descriptor
func (prwe *PrwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error {

switch metric.Data.(type) {
// int points
case *otlp.Metric_DoubleGauge:
Expand Down Expand Up @@ -224,7 +223,6 @@ func (prwe *PrwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries,
// bucket of every data point as a Sample, and adding each Sample to its corresponding TimeSeries.
// tsMap and metric cannot be nil.
func (prwe *PrwExporter) handleHistogramMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error {

switch metric.Data.(type) {
case *otlp.Metric_IntHistogram:
if metric.GetIntHistogram().GetDataPoints() == nil {
Expand Down Expand Up @@ -300,29 +298,29 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti
return errs
}

func (prwe *PrwExporter) execute(ctx context.Context, req *prompb.WriteRequest) error {
func (prwe *PrwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest) error {
// Uses proto.Marshal to convert the WriteRequest into bytes array
data, err := proto.Marshal(req)
data, err := proto.Marshal(writeReq)
if err != nil {
return consumererror.Permanent(err)
}
buf := make([]byte, len(data), cap(data))
compressedData := snappy.Encode(buf, data)

// Create the HTTP POST request to send to the endpoint
httpReq, err := http.NewRequestWithContext(ctx, "POST", prwe.endpointURL.String(), bytes.NewReader(compressedData))
req, err := http.NewRequestWithContext(ctx, "POST", prwe.endpointURL.String(), bytes.NewReader(compressedData))
if err != nil {
return consumererror.Permanent(err)
}

// Add necessary headers specified by:
// https://cortexmetrics.io/docs/apis/#remote-api
httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
httpReq.Header.Set("User-Agent", "OpenTelemetry-Collector/"+version.Version)
req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
req.Header.Set("User-Agent", "OpenTelemetry-Collector/"+version.Version)

httpResp, err := prwe.client.Do(httpReq)
resp, err := prwe.client.Do(req)
if err != nil {
return consumererror.Permanent(err)
}
Expand All @@ -331,18 +329,17 @@ func (prwe *PrwExporter) execute(ctx context.Context, req *prompb.WriteRequest)
// 5xx errors are recoverable and the exporter should retry
// Reference for different behavior according to status code:
// https://github.com/prometheus/prometheus/pull/2552/files#diff-ae8db9d16d8057358e49d694522e7186
if httpResp.StatusCode/100 != 2 {
scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 256))
line := ""
if resp.StatusCode/100 != 2 {
scanner := bufio.NewScanner(io.LimitReader(resp.Body, 256))
var line string
if scanner.Scan() {
line = scanner.Text()
}
errMsg := "server returned HTTP status " + httpResp.Status + ": " + line
if httpResp.StatusCode >= 500 && httpResp.StatusCode < 600 {
return errors.New(errMsg)
err := fmt.Errorf("server returned HTTP status %v: %v ", resp.Status, line)
if resp.StatusCode >= 500 && resp.StatusCode < 600 {
return err
}
return consumererror.Permanent(errors.New(errMsg))

return consumererror.Permanent(err)
}
return nil
}
2 changes: 0 additions & 2 deletions exporter/prometheusremotewriteexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,11 @@ func createMetricsExporter(_ context.Context, params component.ExporterCreatePar
}

client, err := prwCfg.HTTPClientSettings.ToClient()

if err != nil {
return nil, err
}

prwe, err := NewPrwExporter(prwCfg.Namespace, prwCfg.HTTPClientSettings.Endpoint, client, prwCfg.ExternalLabels)

if err != nil {
return nil, err
}
Expand Down
3 changes: 0 additions & 3 deletions exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ func timeSeriesSignature(metric *otlp.Metric, labels *[]prompb.Label) string {
// Unpaired string value is ignored. String pairs overwrites OTLP labels if collision happens, and the overwrite is
// logged. Resultant label names are sanitized.
func createLabelSet(labels []common.StringKeyValue, externalLabels map[string]string, extras ...string) []prompb.Label {

// map ensures no duplicate label name
l := map[string]prompb.Label{}

Expand Down Expand Up @@ -165,7 +164,6 @@ func createLabelSet(labels []common.StringKeyValue, externalLabels map[string]st
}

s := make([]prompb.Label, 0, len(l))

for _, lb := range l {
s = append(s, lb)
}
Expand All @@ -176,7 +174,6 @@ func createLabelSet(labels []common.StringKeyValue, externalLabels map[string]st
// getPromMetricName creates a Prometheus metric name by attaching namespace prefix, and _total suffix for Monotonic
// metrics.
func getPromMetricName(metric *otlp.Metric, ns string) string {

if metric == nil {
return ""
}
Expand Down
2 changes: 0 additions & 2 deletions exporter/prometheusremotewriteexporter/testutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,10 +571,8 @@ func getQuantiles(bounds []float64, values []float64) []*otlp.DoubleSummaryDataP

func getTimeseriesMap(timeseries []*prompb.TimeSeries) map[string]*prompb.TimeSeries {
tsMap := make(map[string]*prompb.TimeSeries)

for i, v := range timeseries {
tsMap[fmt.Sprintf("%s%d", "timeseries_name", i)] = v
}

return tsMap
}

0 comments on commit 4cff93d

Please sign in to comment.