Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): support grpc streaming in xatu output #425

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
9 changes: 4 additions & 5 deletions pkg/output/http/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type ItemExporter struct {
client *http.Client
}

func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error) {
func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (*ItemExporter, error) {
log = log.WithField("output_name", name).WithField("output_type", SinkType)

t := http.DefaultTransport.(*http.Transport).Clone()
Expand All @@ -34,10 +34,9 @@ func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemE
t.DisableKeepAlives = true
}

return ItemExporter{
return &ItemExporter{
config: config,
log: log,

client: &http.Client{
Transport: t,
Timeout: config.ExportTimeout,
Expand All @@ -46,7 +45,7 @@ func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemE
}, nil
}

func (e ItemExporter) ExportItems(ctx context.Context, items []*xatu.DecoratedEvent) error {
func (e *ItemExporter) ExportItems(ctx context.Context, items []*xatu.DecoratedEvent) error {
_, span := observability.Tracer().Start(ctx, "HTTPItemExporter.ExportItems", trace.WithAttributes(attribute.Int64("num_events", int64(len(items)))))
defer span.End()

Expand All @@ -66,7 +65,7 @@ func (e ItemExporter) ExportItems(ctx context.Context, items []*xatu.DecoratedEv
return nil
}

func (e ItemExporter) Shutdown(ctx context.Context) error {
func (e *ItemExporter) Shutdown(ctx context.Context) error {
return nil
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/output/kafka/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type ItemExporter struct {
client sarama.SyncProducer
}

func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error) {
func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (*ItemExporter, error) {
producer, err := NewSyncProducer(config)

if err != nil {
Expand All @@ -30,16 +30,16 @@ func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemE
WithField("output_type", SinkType).
Error("Error while creating the Kafka Client")

return ItemExporter{}, err
return nil, err
}

return ItemExporter{
return &ItemExporter{
config: config,
log: log.WithField("output_name", name).WithField("output_type", SinkType),
client: producer,
}, nil
}
func (e ItemExporter) ExportItems(ctx context.Context, items []*xatu.DecoratedEvent) error {
func (e *ItemExporter) ExportItems(ctx context.Context, items []*xatu.DecoratedEvent) error {
_, span := observability.Tracer().Start(ctx, "KafkaItemExporter.ExportItems", trace.WithAttributes(attribute.Int64("num_events", int64(len(items)))))
defer span.End()

Expand All @@ -59,7 +59,7 @@ func (e ItemExporter) ExportItems(ctx context.Context, items []*xatu.DecoratedEv
return nil
}

func (e ItemExporter) Shutdown(ctx context.Context) error {
func (e *ItemExporter) Shutdown(ctx context.Context) error {
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/output/stdout/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ type ItemExporter struct {
log logrus.FieldLogger
}

func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error) {
return ItemExporter{
func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (*ItemExporter, error) {
return &ItemExporter{
config: config,
log: log.WithField("output_name", name).WithField("output_type", SinkType),
}, nil
}

func (e ItemExporter) ExportItems(ctx context.Context, items []*xatu.DecoratedEvent) error {
func (e *ItemExporter) ExportItems(ctx context.Context, items []*xatu.DecoratedEvent) error {
_, span := observability.Tracer().Start(ctx, "StdOutItemExporter.ExportItems", trace.WithAttributes(attribute.Int64("num_events", int64(len(items)))))
defer span.End()

Expand All @@ -41,7 +41,7 @@ func (e ItemExporter) ExportItems(ctx context.Context, items []*xatu.DecoratedEv
return nil
}

func (e ItemExporter) Shutdown(ctx context.Context) error {
func (e *ItemExporter) Shutdown(ctx context.Context) error {
return nil
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/output/xatu/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
)

type Config struct {
Name string `yaml:"name"`
Address string `yaml:"address"`
Headers map[string]string `yaml:"headers"`
TLS bool `yaml:"tls" default:"false"`
Expand All @@ -16,6 +17,7 @@ type Config struct {
Workers int `yaml:"workers" default:"1"`
Retry RetryConfig `yaml:"retry"`
KeepAlive KeepAliveConfig `yaml:"keepAlive"`
Streaming StreamingConfig `yaml:"streaming"`
}

type KeepAliveConfig struct {
Expand All @@ -37,3 +39,7 @@ type RetryConfig struct {
Scalar time.Duration `yaml:"scalar" default:"0.5s"`
MaxAttempts int `yaml:"maxAttempts" default:"3"`
}

type StreamingConfig struct {
Enabled bool `yaml:"enabled" default:"false"`
}
Loading
Loading