Skip to content

Commit

Permalink
create http dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickdemers6 committed Jan 24, 2024
1 parent 4184af1 commit 622a612
Show file tree
Hide file tree
Showing 13 changed files with 460 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ The following [dispatchers](./telemetry/producer.go#L10-L19) are supported
* Override stream names with env variables: KINESIS_STREAM_\*uppercase topic\* ex.: `KINESIS_STREAM_V`
* Google pubsub: Along with the required pubsub config (See ./test/integration/config.json for example), be sure to set the environment variable `GOOGLE_APPLICATION_CREDENTIALS`
* ZMQ: Configure with the config.json file. See implementation here: [config/config.go](./config/config.go)
* HTTP: Send events to an HTTP endpoint. This should **never** be used in production grade systems as failed requests are discarded and there is limited scalability. It is suitable for owners wishing to stream data directly from their own vehicles. [View configuration options](./datastore/http/http.go#L37).
* Logger: This is a simple STDOUT logger that serializes the protos to json.

>NOTE: To add a new dispatcher, please provide integration tests and updated documentation. To serialize dispatcher data as json instead of protobufs, add a config `transmit_decoded_records` and set value to `true` as shown [here](config/test_configs_test.go#L104)
Expand Down
15 changes: 15 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
confluent "github.com/confluentinc/confluent-kafka-go/v2/kafka"

"github.com/teslamotors/fleet-telemetry/datastore/googlepubsub"
"github.com/teslamotors/fleet-telemetry/datastore/http"
"github.com/teslamotors/fleet-telemetry/datastore/kafka"
"github.com/teslamotors/fleet-telemetry/datastore/kinesis"
"github.com/teslamotors/fleet-telemetry/datastore/simple"
Expand Down Expand Up @@ -65,6 +66,9 @@ type Config struct {
// ZMQ configures a zeromq socket
ZMQ *zmq.Config `json:"zmq,omitempty"`

// HTTP is a configuration for HTTP producer
HTTP *http.Config `json:"http,omitempty"`

// Namespace defines a prefix for the kafka/pubsub topic
Namespace string `json:"namespace,omitempty"`

Expand Down Expand Up @@ -254,6 +258,17 @@ func (c *Config) ConfigureProducers(logger *logrus.Logger) (map[string][]telemet
producers[telemetry.ZMQ] = zmqProducer
}

if _, ok := requiredDispatchers[telemetry.HTTP]; ok {
if c.HTTP == nil {
return nil, errors.New("expected http to be configured")
}
httpProducer, err := http.NewProducer(c.HTTP, c.TransmitDecodedRecords, c.MetricCollector, c.Namespace, logger)
if err != nil {
return nil, err
}
producers[telemetry.HTTP] = httpProducer
}

dispatchProducerRules := make(map[string][]telemetry.Producer)
for recordName, dispatchRules := range c.Records {
var dispatchFuncs []telemetry.Producer
Expand Down
28 changes: 28 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,34 @@ var _ = Describe("Test full application config", func() {
})
})

Context("configure http", func() {
var httpConfig *Config

BeforeEach(func() {
var err error
httpConfig, err = loadTestApplicationConfig(TestHTTPConfig)
Expect(err).NotTo(HaveOccurred())
})

It("returns an error if http isn't included", func() {
log, _ := test.NewNullLogger()
config.Records = map[string][]telemetry.Dispatcher{"FS": {"http"}}
var err error
producers, err = config.ConfigureProducers(log)
Expect(err).To(MatchError("expected http to be configured"))
Expect(producers).To(BeNil())
producers, err = httpConfig.ConfigureProducers(log)
Expect(err).To(BeNil())
})

It("http config works", func() {
log, _ := test.NewNullLogger()
producers, err := httpConfig.ConfigureProducers(log)
Expect(err).NotTo(HaveOccurred())
Expect(producers["FS"]).NotTo(BeNil())
})
})

Context("configureMetricsCollector", func() {
It("does not fail when TLS is nil ", func() {
log, _ := test.NewNullLogger()
Expand Down
14 changes: 14 additions & 0 deletions config/test_configs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,17 @@ const TestTransmitDecodedRecords = `
}
}
`

const TestHTTPConfig = `
{
"host": "127.0.0.1",
"port": 443,
"status_port": 8080,
"http": {
"url": "https://tesla.com/webhook"
},
"records": {
"FS": ["http"]
}
}
`
234 changes: 234 additions & 0 deletions datastore/http/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package http

import (
"bytes"
"crypto/tls"
"errors"
"fmt"
"hash/fnv"
"net/http"
"sync"
"time"

"github.com/sirupsen/logrus"

"github.com/teslamotors/fleet-telemetry/metrics"
"github.com/teslamotors/fleet-telemetry/metrics/adapter"
"github.com/teslamotors/fleet-telemetry/telemetry"
)

const (
jsonContentType = "application/json"
protobufContentType = "application/x-protobuf"
)

// Producer client to handle http
type Producer struct {
namespace string
metricsCollector metrics.MetricCollector
logger *logrus.Logger
workerChannels []chan *telemetry.Record
address string
httpClient http.Client
produceDecodedRecords bool
}

// Metrics stores metrics reported from this package
type Metrics struct {
produceCount adapter.Counter
byteTotal adapter.Counter
errorCount adapter.Counter
bufferSize adapter.Gauge
}

// TLSCertificate contains the paths to the certificate and key files.
type TLSCertificate struct {
CertFile string `json:"cert"`
KeyFile string `json:"key"`
}

// Config contains the data necessary to configure an http producer.
type Config struct {
// WorkerCount is the number of http producer routines running.
// This number should be increased if `http_produce_buffer_size` is growing.
WorkerCount int `json:"worker_count"`

// Address is the address to produce requests to.
Address string `json:"address"`

// Timeout is the number of seconds to wait for a response. Defaults to 10.
Timeout int `json:"timeout"`

// TLS is the TLS configuration for the http producer.
TLS *TLSCertificate `json:"tls,omitempty"`
}

const (
statusCodePreSendErr = "NOT_SENT"
bufferSize = 128
)

var (
metricsRegistry Metrics
metricsOnce sync.Once
)

// NewProducer sets up an HTTP producer
func NewProducer(config *Config, produceDecodedRecords bool, metricsCollector metrics.MetricCollector, namespace string, logger *logrus.Logger) (telemetry.Producer, error) {
registerMetricsOnce(metricsCollector)

err := validateConfig(config)
if err != nil {
return nil, err
}

transport := http.DefaultTransport
if config.TLS != nil {
transport, err = getTransport(config.TLS)
if err != nil {
return nil, err
}
}

producer := &Producer{
namespace: namespace,
metricsCollector: metricsCollector,
workerChannels: make([]chan *telemetry.Record, config.WorkerCount),
address: config.Address,
logger: logger,
produceDecodedRecords: produceDecodedRecords,
httpClient: http.Client{
Timeout: time.Duration(config.Timeout) * time.Second,
Transport: transport,
},
}

for i := 0; i < config.WorkerCount; i++ {
producer.workerChannels[i] = make(chan *telemetry.Record, bufferSize)
go producer.worker(producer.workerChannels[i])
}

logger.Infof("registered http producer for namespace: %s", namespace)
return producer, nil
}

// validateConfig validates configuration values and sets defaults if value not set
func validateConfig(c *Config) error {
if c.WorkerCount < 0 {
return errors.New("invalid http worker count")
}
if c.WorkerCount == 0 {
c.WorkerCount = 5
}
if c.Timeout < 0 {
return errors.New("invalid http timeout")
}
if c.Timeout == 0 {
c.Timeout = 10
}
return nil
}

func getTransport(tlsConfig *TLSCertificate) (*http.Transport, error) {
clientTLSCert, err := tls.LoadX509KeyPair(tlsConfig.CertFile, tlsConfig.KeyFile)
if err != nil {
return nil, err
}
return &http.Transport{
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{clientTLSCert},
},
}, nil
}

func (p *Producer) worker(records <-chan *telemetry.Record) {
for record := range records {
metricsRegistry.bufferSize.Sub(1, map[string]string{})
p.sendHTTP(record)
}
}

func (p *Producer) sendHTTP(record *telemetry.Record) {
url := fmt.Sprintf("%s?namespace=%s&type=%s", p.address, p.namespace, record.TxType)
payload := record.Payload()

req, err := http.NewRequest("POST", url, bytes.NewBuffer(payload))
if err != nil {
p.logError(fmt.Errorf("create_request_err %s", err.Error()))
return
}

contentType := protobufContentType
if p.produceDecodedRecords {
contentType = jsonContentType
}
req.Header.Set("Content-Type", contentType)
for key, value := range record.Metadata() {
req.Header.Set(key, value)
}

resp, err := p.httpClient.Do(req)
if err != nil {
p.logError(fmt.Errorf("send_request_err %s", err.Error()))
return
}
defer resp.Body.Close()

if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
metricsRegistry.produceCount.Inc(map[string]string{"record_type": record.TxType})
metricsRegistry.byteTotal.Add(int64(record.Length()), map[string]string{"record_type": record.TxType})
return
}
p.logError(fmt.Errorf("response_status_code %d", resp.StatusCode))
}

func vinToHash(vin string) uint32 {
h := fnv.New32a()
h.Write([]byte(vin))
return h.Sum32()
}

func (p *Producer) getRecordChannel(vin string) chan *telemetry.Record {
return p.workerChannels[vinToHash(vin)%uint32(len(p.workerChannels))]
}

// Produce asynchronously sends the record payload to http endpoint
func (p *Producer) Produce(record *telemetry.Record) {
p.getRecordChannel(record.Vin) <- record
metricsRegistry.bufferSize.Inc(map[string]string{})
}

func (p *Producer) logError(err error) {
p.logger.Errorf("http_producer_err err: %v", err)
metricsRegistry.errorCount.Inc(map[string]string{})
}

func registerMetricsOnce(metricsCollector metrics.MetricCollector) {
metricsOnce.Do(func() { registerMetrics(metricsCollector) })
}

func registerMetrics(metricsCollector metrics.MetricCollector) {
metricsRegistry.produceCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{
Name: "http_produce_total",
Help: "The number of records produced to http.",
Labels: []string{"record_type"},
})

metricsRegistry.byteTotal = metricsCollector.RegisterCounter(adapter.CollectorOptions{
Name: "http_produce_total_bytes",
Help: "The number of bytes produced to http.",
Labels: []string{"record_type"},
})

metricsRegistry.errorCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{
Name: "http_produce_err",
Help: "The number of errors while producing to http.",
Labels: []string{},
})

metricsRegistry.bufferSize = metricsCollector.RegisterGauge(adapter.CollectorOptions{
Name: "http_produce_buffer_size",
Help: "The number of records waiting to be produced.",
Labels: []string{},
})
}
13 changes: 13 additions & 0 deletions datastore/http/http_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package http_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestConfigs(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Http Producer Suite Tests")
}
50 changes: 50 additions & 0 deletions datastore/http/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package http_test

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/sirupsen/logrus"
"github.com/teslamotors/fleet-telemetry/datastore/http"
"github.com/teslamotors/fleet-telemetry/metrics"
)

var _ = Describe("HTTP Producer", func() {
var (
mockLogger *logrus.Logger
mockCollector metrics.MetricCollector
mockConfig *http.Config
)

BeforeEach(func() {
mockLogger = logrus.New()
mockCollector = metrics.NewCollector(nil, mockLogger)
mockConfig = &http.Config{
WorkerCount: 2,
Address: "https://tesla.com",
Timeout: 5,
}
})

Context("NewProducer", func() {
It("creates a new HTTP producer with valid config", func() {
producer, err := http.NewProducer(mockConfig, false, mockCollector, "test", mockLogger)
Expect(err).ToNot(HaveOccurred())
Expect(producer).ToNot(BeNil())
})

It("returns an error for negative worker count", func() {
mockConfig.WorkerCount = -1
producer, err := http.NewProducer(mockConfig, false, mockCollector, "test", mockLogger)
Expect(err).To(HaveOccurred())
Expect(producer).To(BeNil())
})

It("returns an error for negative timeout", func() {
mockConfig.Timeout = -1
producer, err := http.NewProducer(mockConfig, false, mockCollector, "test", mockLogger)
Expect(err).To(HaveOccurred())
Expect(producer).To(BeNil())
})
})
})
Loading

0 comments on commit 622a612

Please sign in to comment.