Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sumologicexporter] Enable metrics pipeline #2117

Merged
merged 11 commits into from
Feb 20, 2021
142 changes: 131 additions & 11 deletions exporter/sumologicexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
)

type sumologicexporter struct {
sources sourceFormats
config *Config
client *http.Client
filter filter
sources sourceFormats
config *Config
client *http.Client
filter filter
prometheusFormatter prometheusFormatter
}

func initExporter(cfg *Config) (*sumologicexporter, error) {
Expand Down Expand Up @@ -72,16 +73,22 @@ func initExporter(cfg *Config) (*sumologicexporter, error) {
return nil, err
}

pf, err := newPrometheusFormatter()
if err != nil {
return nil, err
}

httpClient, err := cfg.HTTPClientSettings.ToClient()
if err != nil {
return nil, fmt.Errorf("failed to create HTTP Client: %w", err)
}

se := &sumologicexporter{
config: cfg,
sources: sfs,
client: httpClient,
filter: f,
config: cfg,
sources: sfs,
client: httpClient,
filter: f,
prometheusFormatter: pf,
}

return se, nil
Expand All @@ -108,6 +115,27 @@ func newLogsExporter(
)
}

func newMetricsExporter(
cfg *Config,
params component.ExporterCreateParams,
) (component.MetricsExporter, error) {
se, err := initExporter(cfg)
if err != nil {
return nil, err
}

return exporterhelper.NewMetricsExporter(
cfg,
params.Logger,
se.pushMetricsData,
// Disable exporterhelper Timeout, since we are using a custom mechanism
// within exporter itself
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(cfg.RetrySettings),
exporterhelper.WithQueue(cfg.QueueSettings),
)
}

// pushLogsData groups data with common metadata and sends them as separate batched requests.
// It returns the number of unsent logs and an error which contains a list of dropped records
// so they can be handled by OTC retry mechanism
Expand All @@ -124,7 +152,7 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld pdata.Logs) (i
if err != nil {
return 0, consumererror.PartialLogsError(fmt.Errorf("failed to initialize compressor: %w", err), ld)
}
sdr := newSender(se.config, se.client, se.filter, se.sources, c)
sdr := newSender(se.config, se.client, se.filter, se.sources, c, se.prometheusFormatter)

// Iterate over ResourceLogs
rls := ld.ResourceLogs()
Expand Down Expand Up @@ -159,15 +187,15 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld pdata.Logs) (i
errs = append(errs, err)
droppedRecords = append(droppedRecords, dropped...)
}
sdr.cleanBuffer()
sdr.cleanLogsBuffer()
}

// assign metadata
previousMetadata = currentMetadata

// add log to the buffer
var dropped []pdata.LogRecord
dropped, err = sdr.batch(ctx, log, previousMetadata)
dropped, err = sdr.batchLog(ctx, log, previousMetadata)
if err != nil {
droppedRecords = append(droppedRecords, dropped...)
errs = append(errs, err)
Expand Down Expand Up @@ -202,3 +230,95 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld pdata.Logs) (i

return 0, nil
}

// pushMetricsData groups data with common metadata and send them as separate batched requests
// it returns number of unsent metrics and error which contains list of dropped records
// so they can be handle by the OTC retry mechanism
func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pdata.Metrics) (int, error) {
var (
currentMetadata fields
previousMetadata fields
errs []error
droppedRecords []metricPair
attributes pdata.AttributeMap
)

c, err := newCompressor(se.config.CompressEncoding)
if err != nil {
return 0, consumererror.PartialMetricsError(fmt.Errorf("failed to initialize compressor: %w", err), md)
}
sdr := newSender(se.config, se.client, se.filter, se.sources, c, se.prometheusFormatter)

// Iterate over ResourceMetrics
rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)

attributes = rm.Resource().Attributes()

// iterate over InstrumentationLibraryMetrics
ilms := rm.InstrumentationLibraryMetrics()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)

// iterate over Metrics
ms := ilm.Metrics()
for k := 0; k < ms.Len(); k++ {
m := ms.At(k)
mp := metricPair{
metric: m,
attributes: attributes,
}

currentMetadata = sdr.filter.filterIn(attributes)

// If metadata differs from currently buffered, flush the buffer
if currentMetadata.string() != previousMetadata.string() && previousMetadata.string() != "" {
var dropped []metricPair
dropped, err = sdr.sendMetrics(ctx, previousMetadata)
if err != nil {
errs = append(errs, err)
droppedRecords = append(droppedRecords, dropped...)
}
sdr.cleanMetricBuffer()
}

// assign metadata
previousMetadata = currentMetadata
var dropped []metricPair
// add metric to the buffer
dropped, err = sdr.batchMetric(ctx, mp, currentMetadata)
if err != nil {
droppedRecords = append(droppedRecords, dropped...)
errs = append(errs, err)
}
}
}
}

// Flush pending metrics
dropped, err := sdr.sendMetrics(ctx, previousMetadata)
if err != nil {
droppedRecords = append(droppedRecords, dropped...)
errs = append(errs, err)
}

if len(droppedRecords) > 0 {
// Move all dropped records to Metrics
droppedMetrics := pdata.NewMetrics()
rms := droppedMetrics.ResourceMetrics()
rms.Resize(len(droppedRecords))
for num, record := range droppedRecords {
rm := droppedMetrics.ResourceMetrics().At(num)
record.attributes.CopyTo(rm.Resource().Attributes())

ilms := rm.InstrumentationLibraryMetrics()
ilms.Resize(1)
ilms.At(0).Metrics().Append(record.metric)
}

return len(droppedRecords), consumererror.PartialMetricsError(componenterror.CombineErrors(errs), droppedMetrics)
}

return 0, nil
}
Loading