From d803c6045a6ca0fee3c6461f94d4dd55fd1d8442 Mon Sep 17 00:00:00 2001 From: Thomas Barker Date: Thu, 13 Jan 2022 20:36:56 +0000 Subject: [PATCH 1/2] Add batching support to metrics-exporter. --- exporter/collector/metricsexporter.go | 202 +++++++++++++++++++++----- 1 file changed, 162 insertions(+), 40 deletions(-) diff --git a/exporter/collector/metricsexporter.go b/exporter/collector/metricsexporter.go index af86afb1c..072b7ff8e 100644 --- a/exporter/collector/metricsexporter.go +++ b/exporter/collector/metricsexporter.go @@ -25,6 +25,7 @@ import ( "net/url" "path" "strings" + "sync" "time" "unicode" @@ -39,6 +40,7 @@ import ( metricpb "google.golang.org/genproto/googleapis/api/metric" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -55,10 +57,22 @@ type MetricsExporter struct { client *monitoring.MetricClient obs selfObservability mapper metricMapper - // A channel that receives metric descriptor and sends them to GCM once. - mds chan *metricpb.MetricDescriptor - // Channel that is closed when exportMetricDescriptorRunnner goroutine is finished - mdsDone chan struct{} + + // tracks the currently running child tasks + goroutines sync.WaitGroup + // channel for signaling a graceful shutdown + shutdownC chan struct{} + + // A channel that receives metric descriptor and sends them to GCM once + metricDescriptorC chan *metricpb.MetricDescriptor + // Tracks the metric descriptors that have already been sent to GCM + mdCache map[string]*metricpb.MetricDescriptor + + // A channel that receives timeserieses and exports them to GCM in batches + timeSeriesC chan *monitoringpb.TimeSeries + // stores the currently pending batch of timeserieses + pendingTimeSerieses []*monitoringpb.TimeSeries + batchTimeoutTimer *time.Timer } // metricMapper is the part that transforms metrics. Separate from MetricsExporter since it has @@ -74,14 +88,32 @@ const ( SummaryPercentileSuffix = "_summary_percentile" ) +const ( + // batchTimeout is how long to wait to build a full batch before sending + // off what we already have. We set it to 10 seconds because GCM + // throttles us to this anyway. + batchTimeout = 10 * time.Second + + // The number of timeserieses to send to GCM in a single request. This + // is a hard limit in the GCM API, so we never want to exceed 200. + sendBatchSize = 200 +) + type labels map[string]string func (me *MetricsExporter) Shutdown(ctx context.Context) error { - close(me.mds) + // TODO: pass ctx to goroutines so that we can use its deadline + close(me.shutdownC) + c := make(chan struct{}) + go func() { + // Wait until all goroutines are done + me.goroutines.Wait() + close(c) + }() select { - case <-me.mdsDone: case <-ctx.Done(): - me.obs.log.Error("Error waiting for async CreateMetricDescriptor calls to finish.", zap.Error(ctx.Err())) + me.obs.log.Error("Error waiting for async tasks to finish.", zap.Error(ctx.Err())) + case <-c: } return me.client.Close() } @@ -126,22 +158,28 @@ func NewGoogleCloudMetricsExporter( obs: selfObservability{log: log}, mapper: metricMapper{cfg}, // We create a buffered channel for metric descriptors. - // MetricDescritpors are asycnhronously sent and optimistic. + // MetricDescritpors are asychronously sent and optimistic. // We only get Unit/Description/Display name from them, so it's ok // to drop / conserve resources for sending timeseries. - mds: make(chan *metricpb.MetricDescriptor, cfg.MetricConfig.CreateMetricDescriptorBufferSize), - mdsDone: make(chan struct{}), + metricDescriptorC: make(chan *metricpb.MetricDescriptor, cfg.MetricConfig.CreateMetricDescriptorBufferSize), + mdCache: make(map[string]*metricpb.MetricDescriptor), + timeSeriesC: make(chan *monitoringpb.TimeSeries), + shutdownC: make(chan struct{}), } // Fire up the metric descriptor exporter. + mExp.goroutines.Add(1) go mExp.exportMetricDescriptorRunner() + // Fire up the time series exporter. + mExp.goroutines.Add(1) + go mExp.exportTimeSeriesRunner() + return mExp, nil } // PushMetrics calls pushes pdata metrics to GCM, creating metric descriptors if necessary func (me *MetricsExporter) PushMetrics(ctx context.Context, m pdata.Metrics) error { - timeSeries := make([]*monitoringpb.TimeSeries, 0, m.DataPointCount()) rms := m.ResourceMetrics() for i := 0; i < rms.Len(); i++ { rm := rms.At(i) @@ -156,7 +194,9 @@ func (me *MetricsExporter) PushMetrics(ctx context.Context, m pdata.Metrics) err mes := ilm.Metrics() for k := 0; k < mes.Len(); k++ { metric := mes.At(k) - timeSeries = append(timeSeries, me.mapper.metricToTimeSeries(monitoredResource, metricLabels, metric)...) + for _, ts := range me.mapper.metricToTimeSeries(monitoredResource, metricLabels, metric) { + me.timeSeriesC <- ts + } // We only send metric descriptors if we're configured *and* we're not sending service timeseries. if me.cfg.MetricConfig.SkipCreateMetricDescriptor || me.cfg.MetricConfig.CreateServiceTimeSeries { @@ -164,12 +204,13 @@ func (me *MetricsExporter) PushMetrics(ctx context.Context, m pdata.Metrics) err } for _, md := range me.mapper.metricDescriptor(metric) { - if md != nil { - select { - case me.mds <- md: - default: - // Ignore drops, we'll catch descriptor next time around. - } + if md == nil { + continue + } + select { + case me.metricDescriptorC <- md: + default: + // Ignore drops, we'll catch descriptor next time around. } } } @@ -177,36 +218,66 @@ func (me *MetricsExporter) PushMetrics(ctx context.Context, m pdata.Metrics) err } // TODO: self observability + + return nil +} + +func (me *MetricsExporter) exportPendingTimeSerieses() { + ctx := context.Background() + + var sendSize int + if len(me.pendingTimeSerieses) < sendBatchSize { + sendSize = len(me.pendingTimeSerieses) + } else { + sendSize = sendBatchSize + } + + var ts []*monitoringpb.TimeSeries + ts, me.pendingTimeSerieses = me.pendingTimeSerieses, me.pendingTimeSerieses[sendSize:] + + var err error if me.cfg.MetricConfig.CreateServiceTimeSeries { - err := me.createServiceTimeSeries(ctx, timeSeries) - recordPointCount(ctx, len(timeSeries), m.DataPointCount()-len(timeSeries), err) - return err + err = me.createServiceTimeSeries(ctx, ts) + } else { + err = me.createTimeSeries(ctx, ts) + } + + var st string + s, _ := status.FromError(err) + st = statusCodeToString(s) + + recordPointCountDataPoint(ctx, len(ts), st) + if err != nil { + me.obs.log.Error("could not export time series to GCM", zap.Error(err)) } - err := me.createTimeSeries(ctx, timeSeries) - recordPointCount(ctx, len(timeSeries), m.DataPointCount()-len(timeSeries), err) - return err } // Reads metric descriptors from the md channel, and reports them (once) to GCM. func (me *MetricsExporter) exportMetricDescriptorRunner() { - mdCache := make(map[string]*metricpb.MetricDescriptor) + defer me.goroutines.Done() + // We iterate over all metric descritpors until the channel is closed. // Note: if we get terminated, this will still attempt to export all descriptors // prior to shutdown. - for md := range me.mds { - // Not yet sent, now we sent it. - if mdCache[md.Type] == nil { - err := me.exportMetricDescriptor(context.TODO(), md) - // TODO: Log-once on error, per metric descriptor? - if err != nil { - me.obs.log.Error("Unable to send metric descriptor.", zap.Error(err), zap.Any("metric_descriptor", md)) - continue + for { + select { + case <-me.shutdownC: + for { + // We are shutting down. Publish all the pending + // items on the channel before we stop. + select { + case md := <-me.metricDescriptorC: + me.exportMetricDescriptor(md) + default: + // Return and continue graceful shutdown. + return + } } - mdCache[md.Type] = md + + case md := <-me.metricDescriptorC: + me.exportMetricDescriptor(md) } - // TODO: We may want to compare current MD vs. previous and validate no changes. } - close(me.mdsDone) } func (me *MetricsExporter) projectName() string { @@ -215,14 +286,21 @@ func (me *MetricsExporter) projectName() string { } // Helper method to send metric descriptors to GCM. -func (me *MetricsExporter) exportMetricDescriptor(ctx context.Context, md *metricpb.MetricDescriptor) error { - // export +func (me *MetricsExporter) exportMetricDescriptor(md *metricpb.MetricDescriptor) { + if _, exists := me.mdCache[md.Type]; exists { + return + } + me.mdCache[md.Type] = md + req := &monitoringpb.CreateMetricDescriptorRequest{ Name: me.projectName(), MetricDescriptor: md, } - _, err := me.client.CreateMetricDescriptor(ctx, req) - return err + _, err := me.client.CreateMetricDescriptor(context.Background(), req) + if err != nil { + // TODO: Log-once on error, per metric descriptor? + me.obs.log.Error("Unable to send metric descriptor.", zap.Error(err), zap.Any("metric_descriptor", md)) + } } // Sends a user-custom-metric timeseries. @@ -925,3 +1003,47 @@ func mapMetricPointKind(m pdata.Metric) (metricpb.MetricDescriptor_MetricKind, m } return kind, typ } + +func (me *MetricsExporter) exportTimeSeriesRunner() { + defer me.goroutines.Done() + me.batchTimeoutTimer = time.NewTimer(batchTimeout) + for { + select { + case <-me.shutdownC: + for { + // We are shutting down. Publish all the pending + // items on the channel before we stop. + select { + case item := <-me.timeSeriesC: + me.processItem(item) + default: + goto DONE + } + } + DONE: + for len(me.pendingTimeSerieses) > 0 { + me.exportPendingTimeSerieses() + } + // Return and continue graceful shutdown. + return + case item := <-me.timeSeriesC: + me.processItem(item) + case <-me.batchTimeoutTimer.C: + me.batchTimeoutTimer.Reset(batchTimeout) + for len(me.pendingTimeSerieses) > 0 { + me.exportPendingTimeSerieses() + } + } + } +} + +func (me *MetricsExporter) processItem(ts *monitoringpb.TimeSeries) { + me.pendingTimeSerieses = append(me.pendingTimeSerieses, ts) + if len(me.pendingTimeSerieses) >= sendBatchSize { + if !me.batchTimeoutTimer.Stop() { + <-me.batchTimeoutTimer.C + } + me.batchTimeoutTimer.Reset(batchTimeout) + me.exportPendingTimeSerieses() + } +} From 5b58c0fb9830b67ef0893b0184e26d3b9051148c Mon Sep 17 00:00:00 2001 From: Thomas Barker Date: Wed, 26 Jan 2022 20:29:37 +0000 Subject: [PATCH 2/2] Retry when we fail to write metric descriptors. --- exporter/collector/metricsexporter.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/exporter/collector/metricsexporter.go b/exporter/collector/metricsexporter.go index a976a04aa..2d014fadb 100644 --- a/exporter/collector/metricsexporter.go +++ b/exporter/collector/metricsexporter.go @@ -290,7 +290,6 @@ func (me *MetricsExporter) exportMetricDescriptor(md *metricpb.MetricDescriptor) if _, exists := me.mdCache[md.Type]; exists { return } - me.mdCache[md.Type] = md req := &monitoringpb.CreateMetricDescriptorRequest{ Name: me.projectName(), @@ -300,7 +299,11 @@ func (me *MetricsExporter) exportMetricDescriptor(md *metricpb.MetricDescriptor) if err != nil { // TODO: Log-once on error, per metric descriptor? me.obs.log.Error("Unable to send metric descriptor.", zap.Error(err), zap.Any("metric_descriptor", md)) + return } + + // only cache if we are successful. We want to retry if there is an error + me.mdCache[md.Type] = md } // Sends a user-custom-metric timeseries.