Skip to content

Commit

Permalink
Update FileDataProvider to use the new model.Unmarshaler, avoid Inter…
Browse files Browse the repository at this point in the history
…nalRep usage

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Jun 10, 2021
1 parent 28e0be5 commit 9e492f7
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 107 deletions.
70 changes: 24 additions & 46 deletions testbed/testbed/data_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,20 @@
package testbed

import (
"io"
"log"
"os"
"path/filepath"
"strconv"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"go.uber.org/atomic"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal"
otlplogscol "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
otlpmetricscol "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
otlptracecol "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
"go.opentelemetry.io/collector/internal/goldendataset"
"go.opentelemetry.io/collector/internal/idutils"
"go.opentelemetry.io/collector/internal/otlp"
)

// DataProvider defines the interface for generators of test data used to drive various end-to-end tests.
Expand Down Expand Up @@ -257,7 +253,9 @@ func (dp *goldenDataProvider) GenerateLogs() (pdata.Logs, bool) {
// expects just a single JSON message in the entire file).
type FileDataProvider struct {
dataItemsGenerated *atomic.Uint64
message proto.Message
logs pdata.Logs
metrics pdata.Metrics
traces pdata.Traces
ItemsPerBatch int
}

Expand All @@ -268,70 +266,50 @@ func NewFileDataProvider(filePath string, dataType config.DataType) (*FileDataPr
if err != nil {
return nil, err
}
var buf []byte
buf, err = io.ReadAll(file)
if err != nil {
return nil, err
}

var message proto.Message
var dataPointCount int

dp := &FileDataProvider{}
// Load the message from the file and count the data points.

switch dataType {
case config.TracesDataType:
var msg otlptracecol.ExportTraceServiceRequest
if err := protobufJSONUnmarshaler.Unmarshal(file, &msg); err != nil {
if dp.traces, err = otlp.NewJSONTracesUnmarshaler().Unmarshal(buf); err != nil {
return nil, err
}
message = &msg

md := pdata.TracesFromInternalRep(internal.TracesFromOtlp(&msg))
dataPointCount = md.SpanCount()

dp.ItemsPerBatch = dp.traces.SpanCount()
case config.MetricsDataType:
var msg otlpmetricscol.ExportMetricsServiceRequest
if err := protobufJSONUnmarshaler.Unmarshal(file, &msg); err != nil {
if dp.metrics, err = otlp.NewJSONMetricsUnmarshaler().Unmarshal(buf); err != nil {
return nil, err
}
message = &msg

md := pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(&msg))
_, dataPointCount = md.MetricAndDataPointCount()

_, dp.ItemsPerBatch = dp.metrics.MetricAndDataPointCount()
case config.LogsDataType:
var msg otlplogscol.ExportLogsServiceRequest
if err := protobufJSONUnmarshaler.Unmarshal(file, &msg); err != nil {
if dp.logs, err = otlp.NewJSONLogsUnmarshaler().Unmarshal(buf); err != nil {
return nil, err
}
message = &msg

md := pdata.LogsFromInternalRep(internal.LogsFromOtlp(&msg))
dataPointCount = md.LogRecordCount()
dp.ItemsPerBatch = dp.logs.LogRecordCount()
}

return &FileDataProvider{
message: message,
ItemsPerBatch: dataPointCount,
}, nil
return dp, nil
}

func (dp *FileDataProvider) SetLoadGeneratorCounters(dataItemsGenerated *atomic.Uint64) {
dp.dataItemsGenerated = dataItemsGenerated
}

// Marshaler configuration used for marhsaling Protobuf to JSON. Use default config.
var protobufJSONUnmarshaler = &jsonpb.Unmarshaler{}

func (dp *FileDataProvider) GenerateTraces() (pdata.Traces, bool) {
// TODO: implement similar to GenerateMetrics.
return pdata.NewTraces(), true
dp.dataItemsGenerated.Add(uint64(dp.ItemsPerBatch))
return dp.traces, false
}

func (dp *FileDataProvider) GenerateMetrics() (pdata.Metrics, bool) {
md := pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(dp.message.(*otlpmetricscol.ExportMetricsServiceRequest)))
_, dataPointCount := md.MetricAndDataPointCount()
dp.dataItemsGenerated.Add(uint64(dataPointCount))
return md, false
dp.dataItemsGenerated.Add(uint64(dp.ItemsPerBatch))
return dp.metrics, false
}

func (dp *FileDataProvider) GenerateLogs() (pdata.Logs, bool) {
// TODO: implement similar to GenerateMetrics.
return pdata.NewLogs(), true
dp.dataItemsGenerated.Add(uint64(dp.ItemsPerBatch))
return dp.logs, false
}
66 changes: 66 additions & 0 deletions testbed/tests/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ package tests
// coded in this file or use scenarios from perf_scenarios.go.

import (
"path"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/testbed/testbed"
)

Expand Down Expand Up @@ -96,3 +102,63 @@ func TestMetric10kDPS(t *testing.T) {
}

}

func TestMetricsFromFile(t *testing.T) {
// This test demonstrates usage of NewFileDataProvider to generate load using
// previously recorded data.

resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)

// Use metrics previously recorded using "file" exporter and "k8scluster" receiver.
dataProvider, err := testbed.NewFileDataProvider("testdata/k8s-metrics.json", config.MetricsDataType)
assert.NoError(t, err)

options := testbed.LoadOptions{
DataItemsPerSecond: 1_000,
Parallel: 1,
// ItemsPerBatch is based on the data from the file.
ItemsPerBatch: dataProvider.ItemsPerBatch,
}
agentProc := &testbed.ChildProcess{}

sender := testbed.NewOTLPMetricDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
receiver := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t))

configStr := createConfigYaml(t, sender, receiver, resultDir, nil, nil)
configCleanup, err := agentProc.PrepareConfig(configStr)
require.NoError(t, err)
defer configCleanup()

tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
agentProc,
&testbed.PerfTestValidator{},
performanceResultsSummary,
)
defer tc.Stop()

tc.SetResourceLimits(testbed.ResourceSpec{
ExpectedMaxCPU: 120,
ExpectedMaxRAM: 70,
})
tc.StartBackend()
tc.StartAgent("--log-level=debug")

tc.StartLoad(options)

tc.Sleep(tc.Duration)

tc.StopLoad()

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started")
tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() },
"all data items received")

tc.StopAgent()

tc.ValidateData()
}
61 changes: 0 additions & 61 deletions testbed/tests/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/idutils"
"go.opentelemetry.io/collector/testbed/testbed"
Expand Down Expand Up @@ -472,63 +471,3 @@ func TestTraceAttributesProcessor(t *testing.T) {
})
}
}

func TestMetricsFromFile(t *testing.T) {
// This test demonstrates usage of NewFileDataProvider to generate load using
// previously recorded data.

resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)

// Use metrics previously recorded using "file" exporter and "k8scluster" receiver.
dataProvider, err := testbed.NewFileDataProvider("testdata/k8s-metrics.json", config.MetricsDataType)
assert.NoError(t, err)

options := testbed.LoadOptions{
DataItemsPerSecond: 1_000,
Parallel: 1,
// ItemsPerBatch is based on the data from the file.
ItemsPerBatch: dataProvider.ItemsPerBatch,
}
agentProc := &testbed.ChildProcess{}

sender := testbed.NewOTLPMetricDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
receiver := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t))

configStr := createConfigYaml(t, sender, receiver, resultDir, nil, nil)
configCleanup, err := agentProc.PrepareConfig(configStr)
require.NoError(t, err)
defer configCleanup()

tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
agentProc,
&testbed.PerfTestValidator{},
performanceResultsSummary,
)
defer tc.Stop()

tc.SetResourceLimits(testbed.ResourceSpec{
ExpectedMaxCPU: 120,
ExpectedMaxRAM: 70,
})
tc.StartBackend()
tc.StartAgent("--log-level=debug")

tc.StartLoad(options)

tc.Sleep(tc.Duration)

tc.StopLoad()

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started")
tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() },
"all data items received")

tc.StopAgent()

tc.ValidateData()
}

0 comments on commit 9e492f7

Please sign in to comment.