Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
feat: reuse Kafka proxy config logic (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
smoya authored Aug 30, 2021
1 parent 828e143 commit 7cd5edb
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 46 deletions.
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (

// App holds the config for the whole application.
type App struct {
Debug bool
AsyncAPIDoc []byte `split_words:"true"`
Debug bool `desc:"Enable or disable debug logs"`
AsyncAPIDoc []byte `split_words:"true" desc:"Path or URL to a valid AsyncAPI doc (v2.0.0 is only supported)"`
WSServerPort int `split_words:"true" default:"5000" desc:"Port for the Websocket server. Used for debugging events"`
KafkaProxy *KafkaProxy `split_words:"true"`
WSServerPort int `split_words:"true" default:"5000"`
}

// Opt is a functional option used for configuring an App.
Expand Down
99 changes: 57 additions & 42 deletions config/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ import (

// KafkaProxy holds the config for later configuring a Kafka proxy.
type KafkaProxy struct {
BrokerFromServer string `split_words:"true"`
BrokersMapping pipeSeparatedValues `split_words:"true"`
BrokersDialMapping pipeSeparatedValues `split_words:"true"`
ExtraFlags pipeSeparatedValues `split_words:"true"`
MessageValidation MessageValidation `split_words:"true"`
BrokerFromServer string `split_words:"true" desc:"When configuring from an AsyncAPI doc, this allows the user to only configure one server instead of all"`
BrokersMapping pipeSeparatedValues `split_words:"true" desc:"Configure the mapping between remote broker address (the address published by the broker) and desired local address. Format is 'remotehost:remoteport,localhost:localport'. Multiple values can be configured by using pipe separation (|)"`
BrokersDialMapping pipeSeparatedValues `split_words:"true" desc:"Configure the mapping between published remote broker address and the address the proxy will use when forwarding requests. Format is 'remotehost:remoteport,localhost:localport'. Multiple values can be configured by using pipe separation (|)"`
MessageValidation MessageValidation `split_words:"true" desc:""`
ExtraFlags pipeSeparatedValues `split_words:"true" desc:"Advanced configuration. Configure any flag from https://github.com/grepplabs/kafka-proxy/blob/4f3b89fbaecb3eb82426f5dcff5f76188ea9a9dc/cmd/kafka-proxy/server.go#L85-L195. Multiple values can be configured by using pipe separation (|)"`
}

// MessageValidation holds the config about message validation.
type MessageValidation struct {
Enabled bool
Notifier proxy.ValidationErrorNotifier
Enabled bool `default:"true" desc:"Enable or disable validation of Kafka messages"`
Notifier proxy.ValidationErrorNotifier `envconfig:"-"`
}

// NotifyValidationErrorOnChan sets a channel as ValidationError notifier.
Expand Down Expand Up @@ -89,62 +89,77 @@ func (c *KafkaProxy) configFromDoc(d []byte, opts ...kafka.Option) (*kafka.Proxy
opts = append(opts, kafka.WithMessageHandlers(validateMessageHandler(validator)))
}

servers := doc.Servers()
if c.BrokerFromServer != "" {
return kafkaProxyConfigFromServer(c.BrokerFromServer, doc, opts...)
// Pick up only the specified server
s, ok := doc.Server(c.BrokerFromServer)
if !ok {
return nil, fmt.Errorf("server %s not found in the provided AsyncAPI doc", s.Name())
}

if !isValidKafkaProtocol(s) {
return nil, fmt.Errorf("server %s has no kafka protocol configured but '%s'", s.Name(), s.Protocol())
}

servers = []asyncapi.Server{s}
}

return kafkaProxyConfigFromAllServers(doc.Servers(), opts...)
return kafkaProxyConfigFromServers(servers, opts...)
}

func isValidKafkaProtocol(s asyncapi.Server) bool {
return strings.HasPrefix(s.Protocol(), "kafka")
}

func kafkaProxyConfigFromAllServers(servers []asyncapi.Server, opts ...kafka.Option) (*kafka.ProxyConfig, error) {
var brokersMapping []string
var dialAddressMapping []string
for _, s := range servers {
if isValidKafkaProtocol(s) {
l := s.Extension(asyncapi.ExtensionEventGatewayListener)
listenAt, ok := l.(string)
if listenAt == "" || !ok {
return nil, fmt.Errorf("please specify either %s extension, env vars or an AsyncAPI doc in orderr to set the Kafka proxy listener(s)", asyncapi.ExtensionEventGatewayListener)
}

brokersMapping = append(brokersMapping, fmt.Sprintf("%s,%s", s.URL(), listenAt))
func kafkaProxyConfigFromServers(servers []asyncapi.Server, opts ...kafka.Option) (*kafka.ProxyConfig, error) {
brokersMapping, dialAddressMapping, err := extractAddressMappingFromServers(servers...)
if err != nil {
return nil, err
}

if dialMapping := s.Extension(asyncapi.ExtensionEventGatewayDialMapping); dialMapping != nil {
dialAddressMapping = append(dialAddressMapping, fmt.Sprintf("%s,%s", s.URL(), dialMapping))
}
}
if len(brokersMapping) == 0 {
return nil, errors.New("No Kafka brokers were found when configuring")
}

opts = append(opts, kafka.WithDialAddressMapping(dialAddressMapping))
if len(dialAddressMapping) > 0 {
opts = append(opts, kafka.WithDialAddressMapping(dialAddressMapping))
}

return kafka.NewProxyConfig(brokersMapping, opts...)
}

func kafkaProxyConfigFromServer(name string, doc asyncapi.Document, opts ...kafka.Option) (*kafka.ProxyConfig, error) {
s, ok := doc.Server(name)
if !ok {
return nil, fmt.Errorf("server %s not found in the provided AsyncAPI doc", name)
}
func extractAddressMappingFromServers(servers ...asyncapi.Server) (brokersMapping []string, dialAddressMapping []string, err error) {
for _, s := range servers {
if !isValidKafkaProtocol(s) {
continue
}

if !isValidKafkaProtocol(s) {
return nil, fmt.Errorf("server %s has no kafka protocol configured but '%s'", name, s.Protocol())
}
var listenAt string
// If extension is configured, it overrides the value of the port.
if overridePort := s.Extension(asyncapi.ExtensionEventGatewayListener); overridePort != nil {
if val := fmt.Sprintf("%v", overridePort); val != "" { // Convert value to string rep as can be either string or number
if host, _, _ := net.SplitHostPort(val); host == "" {
val = ":" + val // If no host, prefix with : as localhost is inferred
}
listenAt = val
}
} else {
// Use the same port as remote but locally.
_, val, err := net.SplitHostPort(s.URL())
if err != nil {
return nil, nil, errors.Wrapf(err, "error getting port from broker %s. URL:%s", s.Name(), s.URL())
}

// Only one broker will be configured
_, port, err := net.SplitHostPort(s.URL())
if err != nil {
return nil, errors.Wrapf(err, "error getting port from broker %s. URL:%s", s.Name(), s.URL())
}
listenAt = ":" + val // Prefix with : as localhost is inferred
}

if dialMapping := s.Extension(asyncapi.ExtensionEventGatewayDialMapping); dialMapping != nil {
opts = append(opts, kafka.WithDialAddressMapping([]string{fmt.Sprintf("%s,%s", s.URL(), dialMapping)}))
brokersMapping = append(brokersMapping, fmt.Sprintf("%s,%s", s.URL(), listenAt))
if dialMapping := s.Extension(asyncapi.ExtensionEventGatewayDialMapping); dialMapping != nil {
dialAddressMapping = append(dialAddressMapping, fmt.Sprintf("%s,%s", s.URL(), dialMapping))
}
}

return kafka.NewProxyConfig([]string{fmt.Sprintf("%s,:%s", s.URL(), port)}, opts...)
return brokersMapping, dialAddressMapping, nil
}

func validateMessageHandler(validator proxy.MessageValidator) kafka.MessageHandler {
Expand Down
22 changes: 22 additions & 0 deletions config/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,28 @@ func TestKafkaProxy_ProxyConfig(t *testing.T) {
},
doc: []byte(`testdata/simple-kafka.yaml`),
},
{
name: "Valid config. Only one broker + Override listener port from doc",
config: &KafkaProxy{
BrokerFromServer: "test",
},
expectedProxyConfig: func(t *testing.T, c *kafka.ProxyConfig) *kafka.ProxyConfig {
return &kafka.ProxyConfig{
BrokersMapping: []string{"broker.mybrokers.org:9092,:28002"},
}
},
doc: []byte(`testdata/override-port-kafka.yaml`),
},
{
name: "Valid config. All brokers + Override listener port from doc",
config: &KafkaProxy{},
expectedProxyConfig: func(t *testing.T, c *kafka.ProxyConfig) *kafka.ProxyConfig {
return &kafka.ProxyConfig{
BrokersMapping: []string{"broker.mybrokers.org:9092,:28002"},
}
},
doc: []byte(`testdata/override-port-kafka.yaml`),
},
{
name: "Valid config. Only broker mapping",
config: &KafkaProxy{
Expand Down
22 changes: 22 additions & 0 deletions config/testdata/override-port-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
asyncapi: '2.0.0'
info:
title: Test
version: '1.0.0'
servers:
test:
url: broker.mybrokers.org:9092
protocol: kafka
x-eventgateway-listener: 28002
channels:
events:
publish:
operationId: onEvent
message:
name: event
payload:
type: object
properties:
id:
type: integer
minimum: 0
description: Id of the event.
7 changes: 6 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ import (
"github.com/sirupsen/logrus"
)

const configPrefix = "eventgateway"

func main() {
validationErrChan := make(chan *proxy.ValidationError)
c := config.NewApp(config.NotifyValidationErrorOnChan(validationErrChan))

if err := envconfig.Process("eventgateway", c); err != nil {
if err := envconfig.Process(configPrefix, c); err != nil {
_ = envconfig.Usage(configPrefix, c)
logrus.WithError(err).Fatal()
}

Expand All @@ -33,11 +36,13 @@ func main() {

kafkaProxyConfig, err := c.ProxyConfig()
if err != nil {
_ = envconfig.Usage(configPrefix, c)
logrus.WithError(err).Fatal()
}

kafkaProxy, err := kafka.NewProxy(kafkaProxyConfig)
if err != nil {
_ = envconfig.Usage(configPrefix, c)
logrus.WithError(err).Fatal()
}

Expand Down

0 comments on commit 7cd5edb

Please sign in to comment.