Skip to content

Commit

Permalink
Modify kinesisexporter to support metrics/traces to otlp_proto encodi…
Browse files Browse the repository at this point in the history
…ng (#9)

* Add OTLP marshalling support for kinesis exporter (#1)

Adding a Marshaller interface that allows encoding to different export formats
Add an OTLP marshaller with support for metrics

Co-authored-by: Iris Grace Endozo <iendozo@atlassian.com>

* Swapped kinesis opencensus producer with omnition producer

* Moved AWS/Kinesis setup to exporter.go

* Delete unused Message struct (#3)

Co-authored-by: Iris Grace Endozo <iendozo@atlassian.com>

* Integrated marshaller into trace path

* Adding tests for exporter and trace marshaling

* Support sending out metrics to kinesis (#5)

Co-authored-by: Iris Grace Endozo <iendozo@atlassian.com>

* Added Start function to factory and logging of producer failures

* Removed var

* Changed order of config variables

* Increase test coverage + some cleanups (#8)

* Add tests + private methods

* Add comments + fixes

Co-authored-by: Iris Grace Endozo <iendozo@atlassian.com>

* Add README.mdgst

* Added parallel testing and import order fixes

* Added pointer receivers

* Opentelemetry to OpenTelemetry

* Unused imports

* Add license and fix import order

* Fixed license

* Fix linting shadowing error

* Change readme descriptions

* Use sensible values in example config

* Added context validation in exporter

* Move invalid context as const

Co-authored-by: Iris Grace Endozo <iendozo@atlassian.com>
Co-authored-by: Raymond Wang <rwang2@atlassian.com>
  • Loading branch information
3 people authored Oct 18, 2020
1 parent 8d6f04c commit 2c33fbb
Show file tree
Hide file tree
Showing 16 changed files with 644 additions and 118 deletions.
47 changes: 46 additions & 1 deletion exporter/kinesisexporter/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,48 @@
# Kinesis Exporter

To be added.
Kinesis exporter exports OpenTelemetry data to Kinesis. This exporter uses a [KPL][kpl-url]-like batch producer and uses
the same aggregation format that KPLs use. Message payload encoding is configurable.

The following settings can be optionally configured:
- `aws` contains AWS specific configuration
- `stream_name` (default = test-stream): The name of the Kinesis stream where events are sent/pushed
- `kinesis_endpoint`: The Kinesis endpoint if role is not being assumed
- `region` (default = us-west-2): The AWS region where the Kinesis stream is defined
- `role`: The Kinesis role to assume
- `kpl` contains kinesis producer library related config to controls things like aggregation, batching, connections, retries, etc
- `aggregate_batch_count` (default = 4294967295): Determines the maximum number of items to pack into an aggregated record. Must not exceed 4294967295
- `aggregate_batch_size` (default = 51200): Determines the maximum number of bytes to pack into an aggregated record. User records larger than this will bypass aggregation
- `batch_size` (default = 5242880): Determines the maximum number of bytes to send with a PutRecords request. Must not exceed 5MiB
- `batch_count` (default = 1000): Determines the maximum number of items to pack in the batch. Must not exceed 1000
- `backlog_count` (default = 2000): Determines the channel capacity before Put() will begin blocking. Default to `BatchCount`
- `flush_interval_seconds` (default = 5): The regular interval for flushing the kinesis producer buffer
- `max_connections` (default = 24): Number of requests to send concurrently
- `max_retries` (default = 10): Number of retry attempts to make before dropping records
- `max_backoff_seconds` (default = 60): Maximum time to backoff. Must be greater than 1s
- `encoding` (default = otlp_proto): The encoding of the payload sent to Kinesis. Available encodings:
- `otlp_proto`: the payload is serialized to otlp proto bytes

Example configuration:

```yaml
exporters:
kinesis:
encoding: "otlp_proto"
aws:
stream_name: test-stream
region: mars-1
role: arn:test-role
kinesis_endpoint: kinesis.mars-1.aws.galactic
kpl:
aggregate_batch_count: 4294967295
aggregate_batch_size: 51200
batch_size: 5242880
batch_count: 1000
backlog_count: 2000
flush_interval_seconds: 5
max_connections: 24
max_retries: 10
max_backoff_seconds: 60
```
[kpl-url]: https://github.com/awslabs/amazon-kinesis-producer
6 changes: 1 addition & 5 deletions exporter/kinesisexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,5 @@ type Config struct {
AWS AWSConfig `mapstructure:"aws"`
KPL KPLConfig `mapstructure:"kpl"`

QueueSize int `mapstructure:"queue_size"`
NumWorkers int `mapstructure:"num_workers"`
MaxBytesPerBatch int `mapstructure:"max_bytes_per_batch"`
MaxBytesPerSpan int `mapstructure:"max_bytes_per_span"`
FlushIntervalSeconds int `mapstructure:"flush_interval_seconds"`
Encoding string `mapstructure:"encoding"`
}
21 changes: 7 additions & 14 deletions exporter/kinesisexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
)

func TestDefaultConfig(t *testing.T) {
t.Parallel()
factories, err := componenttest.ExampleComponents()
assert.Nil(t, err)

Expand All @@ -47,7 +48,8 @@ func TestDefaultConfig(t *testing.T) {
NameVal: "kinesis",
},
AWS: AWSConfig{
Region: "us-west-2",
Region: "us-west-2",
StreamName: "test-stream",
},
KPL: KPLConfig{
BatchSize: 5242880,
Expand All @@ -56,20 +58,15 @@ func TestDefaultConfig(t *testing.T) {
FlushIntervalSeconds: 5,
MaxConnections: 24,
},

QueueSize: 100000,
NumWorkers: 8,
FlushIntervalSeconds: 5,
MaxBytesPerBatch: 100000,
MaxBytesPerSpan: 900000,
Encoding: defaultEncoding,
},
)
}

func TestConfig(t *testing.T) {
t.Parallel()
factories, err := componenttest.ExampleComponents()
assert.Nil(t, err)

factory := NewFactory()
factories.Exporters[factory.Type()] = factory
cfg, err := configtest.LoadConfigFile(
Expand Down Expand Up @@ -104,17 +101,13 @@ func TestConfig(t *testing.T) {
MaxRetries: 17,
MaxBackoffSeconds: 18,
},

QueueSize: 1,
NumWorkers: 2,
FlushIntervalSeconds: 3,
MaxBytesPerBatch: 4,
MaxBytesPerSpan: 5,
Encoding: "",
},
)
}

func TestConfigCheck(t *testing.T) {
t.Parallel()
cfg := (NewFactory()).CreateDefaultConfig()
assert.NoError(t, configcheck.ValidateConfig(cfg))
}
107 changes: 75 additions & 32 deletions exporter/kinesisexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,100 @@ package kinesisexporter

import (
"context"
"fmt"

kinesis "github.com/signalfx/opencensus-go-exporter-kinesis"
"github.com/google/uuid"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger"
"go.uber.org/zap"
)

// Exporter implements an OpenTelemetry trace exporter that exports all spans to AWS Kinesis
type Exporter struct {
kinesis *kinesis.Exporter
logger *zap.Logger
const (
errInvalidContext = "invalid context"
)

// exporter implements an OpenTelemetry exporter that pushes OpenTelemetry data to AWS Kinesis
type exporter struct {
producer producer
logger *zap.Logger
marshaller Marshaller
}

var _ component.TraceExporter = (*Exporter)(nil)
// newExporter creates a new exporter with the passed in configurations.
// It starts the AWS session and setups the relevant connections.
func newExporter(c *Config, logger *zap.Logger) (*exporter, error) {
// Get marshaller based on config
marshaller := defaultMarshallers()[c.Encoding]
if marshaller == nil {
return nil, fmt.Errorf("unrecognized encoding")
}

// Start tells the exporter to start. The exporter may prepare for exporting
pr, err := newKinesisProducer(c, logger)
if err != nil {
return nil, err
}

return &exporter{producer: pr, marshaller: marshaller, logger: logger}, nil
}

// start tells the exporter to start. The exporter may prepare for exporting
// by connecting to the endpoint. Host parameter can be used for communicating
// with the host after Start() has already returned. If error is returned by
// Start() then the collector startup will be aborted.
func (e Exporter) Start(_ context.Context, _ component.Host) error {
// with the host after start() has already returned. If error is returned by
// start() then the collector startup will be aborted.
func (e *exporter) start(ctx context.Context, _ component.Host) error {
if ctx == nil || ctx.Err() != nil {
return fmt.Errorf(errInvalidContext)
}

e.producer.start()
return nil
}

// Shutdown is invoked during exporter shutdown.
func (e Exporter) Shutdown(context.Context) error {
e.kinesis.Flush()
// shutdown is invoked during exporter shutdown
func (e *exporter) shutdown(ctx context.Context) error {
if ctx == nil || ctx.Err() != nil {
return fmt.Errorf(errInvalidContext)
}

e.producer.stop()
return nil
}

// ConsumeTraceData receives a span batch and exports it to AWS Kinesis
func (e Exporter) ConsumeTraces(_ context.Context, td pdata.Traces) error {
pBatches, err := jaegertranslator.InternalTracesToJaegerProto(td)
func (e *exporter) pushTraces(ctx context.Context, td pdata.Traces) (int, error) {
if ctx == nil || ctx.Err() != nil {
return 0, fmt.Errorf(errInvalidContext)
}

pBatches, err := e.marshaller.MarshalTraces(td)
if err != nil {
e.logger.Error("error translating span batch", zap.Error(err))
return consumererror.Permanent(err)
return td.SpanCount(), consumererror.Permanent(err)
}

if err = e.producer.put(pBatches, uuid.New().String()); err != nil {
e.logger.Error("error exporting span to kinesis", zap.Error(err))
return td.SpanCount(), err
}

return 0, nil
}

func (e *exporter) pushMetrics(ctx context.Context, td pdata.Metrics) (int, error) {
if ctx == nil || ctx.Err() != nil {
return 0, fmt.Errorf(errInvalidContext)
}

pBatches, err := e.marshaller.MarshalMetrics(td)
if err != nil {
e.logger.Error("error translating metrics batch", zap.Error(err))
return td.MetricCount(), consumererror.Permanent(err)
}
// TODO: Use a multi error type
var exportErr error
for _, pBatch := range pBatches {
for _, span := range pBatch.GetSpans() {
if span.Process == nil {
span.Process = pBatch.Process
}
err := e.kinesis.ExportSpan(span)
if err != nil {
e.logger.Error("error exporting span to kinesis", zap.Error(err))
exportErr = err
}
}

if err = e.producer.put(pBatches, uuid.New().String()); err != nil {
e.logger.Error("error exporting metrics to kinesis", zap.Error(err))
return td.MetricCount(), err
}
return exportErr

return 0, nil
}
131 changes: 131 additions & 0 deletions exporter/kinesisexporter/exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2019 OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kinesisexporter

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap/zaptest"
)

type producerMock struct {
mock.Mock
}

func (m *producerMock) start() {
m.Called()
}

func (m *producerMock) stop() {
m.Called()
}

func (m *producerMock) put(data []byte, partitionKey string) error {
args := m.Called(data, partitionKey)
return args.Error(0)
}

func TestNewKinesisExporter(t *testing.T) {
t.Parallel()
cfg := createDefaultConfig().(*Config)
require.NotNil(t, cfg)

exp, err := newExporter(cfg, zaptest.NewLogger(t))
assert.NotNil(t, exp)
assert.NoError(t, err)
}

func TestNewKinesisExporterBadEncoding(t *testing.T) {
t.Parallel()
cfg := createDefaultConfig().(*Config)
require.NotNil(t, cfg)
cfg.Encoding = ""

exp, err := newExporter(cfg, zaptest.NewLogger(t))
assert.Nil(t, exp)
assert.Errorf(t, err, "unrecognized encoding")
}

func TestPushingTracesToKinesisQueue(t *testing.T) {
t.Parallel()
cfg := createDefaultConfig().(*Config)
require.NotNil(t, cfg)

exp, _ := newExporter(cfg, zaptest.NewLogger(t))
mockProducer := new(producerMock)
exp.producer = mockProducer
require.NotNil(t, exp)

mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(nil)

dropped, err := exp.pushTraces(context.Background(), pdata.NewTraces())
require.NoError(t, err)
require.Equal(t, 0, dropped)
}

func TestErrorPushingTracesToKinesisQueue(t *testing.T) {
t.Parallel()
cfg := createDefaultConfig().(*Config)
require.NotNil(t, cfg)

exp, _ := newExporter(cfg, zaptest.NewLogger(t))
mockProducer := new(producerMock)
exp.producer = mockProducer
require.NotNil(t, exp)

mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(fmt.Errorf("someerror"))

_, err := exp.pushTraces(context.Background(), pdata.NewTraces())
require.Error(t, err)
}

func TestPushingMetricsToKinesisQueue(t *testing.T) {
t.Parallel()
cfg := createDefaultConfig().(*Config)
require.NotNil(t, cfg)

exp, _ := newExporter(cfg, zaptest.NewLogger(t))
mockProducer := new(producerMock)
exp.producer = mockProducer
require.NotNil(t, exp)

mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(nil)

dropped, err := exp.pushMetrics(context.Background(), pdata.NewMetrics())
require.NoError(t, err)
require.Equal(t, 0, dropped)
}

func TestErrorPushingMetricsToKinesisQueue(t *testing.T) {
t.Parallel()
cfg := createDefaultConfig().(*Config)
require.NotNil(t, cfg)

exp, _ := newExporter(cfg, zaptest.NewLogger(t))
mockProducer := new(producerMock)
exp.producer = mockProducer
require.NotNil(t, exp)

mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(fmt.Errorf("someerror"))

_, err := exp.pushMetrics(context.Background(), pdata.NewMetrics())
require.Error(t, err)
}
Loading

0 comments on commit 2c33fbb

Please sign in to comment.