Skip to content

Commit

Permalink
make some functions public (#1702)
Browse files Browse the repository at this point in the history
update design link
  • Loading branch information
huyan0 authored Sep 1, 2020
1 parent fb85cc1 commit 274fb4e
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 32 deletions.
2 changes: 1 addition & 1 deletion exporter/prometheusremotewriteexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ prometheusremotewrite:
The full list of settings exposed for this exporter are documented [here](./config.go)
with detailed sample configurations [here](./testdata/config.yaml).
_Here is a link to the overall project [design](https://github.com/open-telemetry/opentelemetry-collector/pull/1464)_
_Here is a link to the overall project [design](./DESIGN.md)_
24 changes: 12 additions & 12 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@ import (
"go.opentelemetry.io/collector/internal/dataold"
)

// prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint
type prwExporter struct {
// PrwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint
type PrwExporter struct {
namespace string
endpointURL *url.URL
client *http.Client
wg *sync.WaitGroup
closeChan chan struct{}
}

// newPrwExporter initializes a new prwExporter instance and sets fields accordingly.
// NewPrwExporter initializes a new PrwExporter instance and sets fields accordingly.
// client parameter cannot be nil.
func newPrwExporter(namespace string, endpoint string, client *http.Client) (*prwExporter, error) {
func NewPrwExporter(namespace string, endpoint string, client *http.Client) (*PrwExporter, error) {

if client == nil {
return nil, errors.New("http client cannot be nil")
Expand All @@ -60,7 +60,7 @@ func newPrwExporter(namespace string, endpoint string, client *http.Client) (*pr
return nil, errors.New("invalid endpoint")
}

return &prwExporter{
return &PrwExporter{
namespace: namespace,
endpointURL: endpointURL,
client: client,
Expand All @@ -69,18 +69,18 @@ func newPrwExporter(namespace string, endpoint string, client *http.Client) (*pr
}, nil
}

// shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations
// Shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations
// to finish before returning
func (prwe *prwExporter) shutdown(context.Context) error {
func (prwe *PrwExporter) Shutdown(context.Context) error {
close(prwe.closeChan)
prwe.wg.Wait()
return nil
}

// pushMetrics converts metrics to Prometheus remote write TimeSeries and send to remote endpoint. It maintain a map of
// PushMetrics converts metrics to Prometheus remote write TimeSeries and send to remote endpoint. It maintain a map of
// TimeSeries, validates and handles each individual metric, adding the converted TimeSeries to the map, and finally
// exports the map.
func (prwe *prwExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int, error) {
func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int, error) {
prwe.wg.Add(1)
defer prwe.wg.Done()
select {
Expand Down Expand Up @@ -149,7 +149,7 @@ func (prwe *prwExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int
// handleScalarMetric processes data points in a single OTLP scalar metric by adding the each point as a Sample into
// its corresponding TimeSeries in tsMap.
// tsMap and metric cannot be nil, and metric must have a non-nil descriptor
func (prwe *prwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error {
func (prwe *PrwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error {

mType := metric.MetricDescriptor.Type

Expand Down Expand Up @@ -199,7 +199,7 @@ func (prwe *prwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries,
// handleHistogramMetric processes data points in a single OTLP histogram metric by mapping the sum, count and each
// bucket of every data point as a Sample, and adding each Sample to its corresponding TimeSeries.
// tsMap and metric cannot be nil.
func (prwe *prwExporter) handleHistogramMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error {
func (prwe *PrwExporter) handleHistogramMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error {

if metric.HistogramDataPoints == nil {
return fmt.Errorf("nil data point field in metric %s", metric.GetMetricDescriptor().Name)
Expand Down Expand Up @@ -258,7 +258,7 @@ func (prwe *prwExporter) handleHistogramMetric(tsMap map[string]*prompb.TimeSeri
}

// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
func (prwe *prwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error {
func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error {
//Calls the helper function to convert the TsMap to the desired format
req, err := wrapTimeSeries(tsMap)
if err != nil {
Expand Down
32 changes: 16 additions & 16 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func Test_handleScalarMetric(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tsMap := map[string]*prompb.TimeSeries{}
prw := &prwExporter{}
prw := &PrwExporter{}
ok := prw.handleScalarMetric(tsMap, tt.m)
if tt.returnError {
assert.Error(t, ok)
Expand Down Expand Up @@ -233,7 +233,7 @@ func Test_handleHistogramMetric(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tsMap := map[string]*prompb.TimeSeries{}
prw := &prwExporter{}
prw := &PrwExporter{}
ok := prw.handleHistogramMetric(tsMap, &tt.m)
if tt.returnError {
assert.Error(t, ok)
Expand All @@ -249,8 +249,8 @@ func Test_handleHistogramMetric(t *testing.T) {
}
}

// Test_ newPrwExporter checks that a new exporter instance with non-nil fields is initialized
func Test_newPrwExporter(t *testing.T) {
// Test_ NewPrwExporter checks that a new exporter instance with non-nil fields is initialized
func Test_NewPrwExporter(t *testing.T) {
config := &Config{
ExporterSettings: configmodels.ExporterSettings{},
TimeoutSettings: exporterhelper.TimeoutSettings{},
Expand Down Expand Up @@ -295,7 +295,7 @@ func Test_newPrwExporter(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
prwe, err := newPrwExporter(tt.namespace, tt.endpoint, tt.client)
prwe, err := NewPrwExporter(tt.namespace, tt.endpoint, tt.client)
if tt.returnError {
assert.Error(t, err)
return
Expand All @@ -310,22 +310,22 @@ func Test_newPrwExporter(t *testing.T) {
}
}

// Test_shutdown checks after shutdown is called, incoming calls to pushMetrics return error.
func Test_shutdown(t *testing.T) {
prwe := &prwExporter{
// Test_Shutdown checks after Shutdown is called, incoming calls to PushMetrics return error.
func Test_Shutdown(t *testing.T) {
prwe := &PrwExporter{
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
}
wg := new(sync.WaitGroup)
errChan := make(chan error, 5)
err := prwe.shutdown(context.Background())
err := prwe.Shutdown(context.Background())
require.NoError(t, err)
errChan = make(chan error, 5)
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, ok := prwe.pushMetrics(context.Background(),
_, ok := prwe.PushMetrics(context.Background(),
pdatautil.MetricsFromOldInternalMetrics(testdataold.GenerateMetricDataEmpty()))
errChan <- ok
}()
Expand All @@ -339,7 +339,7 @@ func Test_shutdown(t *testing.T) {

//Test whether or not the Server receives the correct TimeSeries.
//Currently considering making this test an iterative for loop of multiple TimeSeries
//Much akin to Test_pushMetrics
//Much akin to Test_PushMetrics
func Test_export(t *testing.T) {
//First we will instantiate a dummy TimeSeries instance to pass into both the export call and compare the http request
labels := getPromLabels(label11, value11, label12, value12, label21, value21, label22, value22)
Expand Down Expand Up @@ -432,17 +432,17 @@ func runExportPipeline(t *testing.T, ts *prompb.TimeSeries, endpoint *url.URL) e

HTTPClient := http.DefaultClient
//after this, instantiate a CortexExporter with the current HTTP client and endpoint set to passed in endpoint
prwe, err := newPrwExporter("test", endpoint.String(), HTTPClient)
prwe, err := NewPrwExporter("test", endpoint.String(), HTTPClient)
if err != nil {
return err
}
err = prwe.export(context.Background(), testmap)
return err
}

// Test_pushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as
// Test_PushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as
// expected
func Test_pushMetrics(t *testing.T) {
func Test_PushMetrics(t *testing.T) {

noTempBatch := pdatautil.MetricsFromOldInternalMetrics((testdataold.GenerateMetricDataManyMetricsSameResource(10)))
invalidTypeBatch := pdatautil.MetricsFromOldInternalMetrics((testdataold.GenerateMetricDataMetricTypeInvalid()))
Expand Down Expand Up @@ -693,9 +693,9 @@ func Test_pushMetrics(t *testing.T) {
// c, err := config.HTTPClientSettings.ToClient()
// assert.Nil(t, err)
c := http.DefaultClient
prwe, nErr := newPrwExporter(config.Namespace, serverURL.String(), c)
prwe, nErr := NewPrwExporter(config.Namespace, serverURL.String(), c)
require.NoError(t, nErr)
numDroppedTimeSeries, err := prwe.pushMetrics(context.Background(), *tt.md)
numDroppedTimeSeries, err := prwe.PushMetrics(context.Background(), *tt.md)
assert.Equal(t, tt.numDroppedTimeSeries, numDroppedTimeSeries)
if tt.returnErr {
assert.Error(t, err)
Expand Down
6 changes: 3 additions & 3 deletions exporter/prometheusremotewriteexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,19 @@ func createMetricsExporter(_ context.Context, _ component.ExporterCreateParams,
return nil, err
}

prwe, err := newPrwExporter(prwCfg.Namespace, prwCfg.HTTPClientSettings.Endpoint, client)
prwe, err := NewPrwExporter(prwCfg.Namespace, prwCfg.HTTPClientSettings.Endpoint, client)

if err != nil {
return nil, err
}

prwexp, err := exporterhelper.NewMetricsExporter(
cfg,
prwe.pushMetrics,
prwe.PushMetrics,
exporterhelper.WithTimeout(prwCfg.TimeoutSettings),
exporterhelper.WithQueue(prwCfg.QueueSettings),
exporterhelper.WithRetry(prwCfg.RetrySettings),
exporterhelper.WithShutdown(prwe.shutdown),
exporterhelper.WithShutdown(prwe.Shutdown),
)

return prwexp, err
Expand Down

0 comments on commit 274fb4e

Please sign in to comment.