From 28b67d648b0a42053dffec2b7b93de69705976a7 Mon Sep 17 00:00:00 2001 From: Sergio Moya <1083296+smoya@users.noreply.github.com> Date: Tue, 13 Jul 2021 17:28:43 +0200 Subject: [PATCH] feat(config): allow configuring single listener from AsyncAPI server (#18) --- asyncapi/document.go | 1 + asyncapi/extension.go | 7 ++ asyncapi/v2/decode_test.go | 21 +++--- asyncapi/v2/v2.go | 6 ++ config/config.go | 28 ++++++++ config/kafka.go | 110 ++++++++++++++++++++++++++++++ config/kafka_test.go | 85 +++++++++++++++++++++++ config/testdata/simple-kafka.yaml | 21 ++++++ kafka/config.go | 97 ++++++++++++++++++++++++++ kafka/config_test.go | 58 ++++++++++++++++ kafka/proxy.go | 18 ++--- kafka/proxy_test.go | 12 ++-- main.go | 80 +++------------------- 13 files changed, 451 insertions(+), 93 deletions(-) create mode 100644 asyncapi/extension.go create mode 100644 config/config.go create mode 100644 config/kafka.go create mode 100644 config/kafka_test.go create mode 100644 config/testdata/simple-kafka.yaml create mode 100644 kafka/config.go create mode 100644 kafka/config_test.go diff --git a/asyncapi/document.go b/asyncapi/document.go index eba300a..80b3bcd 100644 --- a/asyncapi/document.go +++ b/asyncapi/document.go @@ -4,6 +4,7 @@ package asyncapi // It's API implements https://github.com/asyncapi/parser-api/blob/master/docs/v1.md. type Document interface { Extendable + Server(name string) (Server, bool) Servers() []Server HasServers() bool } diff --git a/asyncapi/extension.go b/asyncapi/extension.go new file mode 100644 index 0000000..b36ebac --- /dev/null +++ b/asyncapi/extension.go @@ -0,0 +1,7 @@ +package asyncapi + +// AsyncAPI doc extensions for the Event Gateway. +const ( + ExtensionEventGatewayListener = "x-eventgateway-listener" + ExtensionEventGatewayDialMapping = "x-eventgateway-dial-mapping" +) diff --git a/asyncapi/v2/decode_test.go b/asyncapi/v2/decode_test.go index ceadd73..66fa3be 100644 --- a/asyncapi/v2/decode_test.go +++ b/asyncapi/v2/decode_test.go @@ -3,6 +3,7 @@ package v2 import ( "testing" + "github.com/asyncapi/event-gateway/asyncapi" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -12,7 +13,9 @@ func TestDecodeFromFile(t *testing.T) { require.NoError(t, Decode([]byte("testdata/example-kafka.yaml"), doc)) require.Len(t, doc.Servers(), 1) - s := doc.Servers()[0] + s, ok := doc.Server("test") + assert.True(t, ok) + assert.Equal(t, s, doc.Servers()[0]) assert.True(t, s.HasName()) assert.Equal(t, "test", s.Name()) @@ -22,10 +25,10 @@ func TestDecodeFromFile(t *testing.T) { assert.Equal(t, "kafka-secure", s.Protocol()) assert.True(t, s.HasURL()) assert.Equal(t, "localhost:9092", s.URL()) - assert.True(t, s.HasExtension("x-eventgateway-listener")) - assert.Equal(t, "proxy:28002", s.Extension("x-eventgateway-listener")) - assert.True(t, s.HasExtension("x-eventgateway-dial-mapping")) - assert.Equal(t, "broker:9092", s.Extension("x-eventgateway-dial-mapping")) + assert.True(t, s.HasExtension(asyncapi.ExtensionEventGatewayListener)) + assert.Equal(t, "proxy:28002", s.Extension(asyncapi.ExtensionEventGatewayListener)) + assert.True(t, s.HasExtension(asyncapi.ExtensionEventGatewayDialMapping)) + assert.Equal(t, "broker:9092", s.Extension(asyncapi.ExtensionEventGatewayDialMapping)) assert.Empty(t, s.Variables()) } @@ -73,7 +76,9 @@ channels: require.NoError(t, Decode(raw, doc)) require.Len(t, doc.Servers(), 1) - s := doc.Servers()[0] + s, ok := doc.Server("mosquitto") + assert.True(t, ok) + assert.Equal(t, s, doc.Servers()[0]) assert.True(t, s.HasName()) assert.Equal(t, "mosquitto", s.Name()) @@ -82,7 +87,7 @@ channels: assert.Equal(t, "mqtt", s.Protocol()) assert.True(t, s.HasURL()) assert.Equal(t, "mqtt://test.mosquitto.org", s.URL()) - assert.False(t, s.HasExtension("x-eventgateway-listener")) - assert.False(t, s.HasExtension("x-eventgateway-dial-mapping")) + assert.False(t, s.HasExtension(asyncapi.ExtensionEventGatewayListener)) + assert.False(t, s.HasExtension(asyncapi.ExtensionEventGatewayDialMapping)) assert.Empty(t, s.Variables()) } diff --git a/asyncapi/v2/v2.go b/asyncapi/v2/v2.go index 811e0c9..aca74ff 100644 --- a/asyncapi/v2/v2.go +++ b/asyncapi/v2/v2.go @@ -7,6 +7,11 @@ type Document struct { ServersField map[string]Server `mapstructure:"servers"` } +func (d Document) Server(name string) (asyncapi.Server, bool) { + s, ok := d.ServersField[name] + return s, ok +} + func (d Document) Servers() []asyncapi.Server { var servers []asyncapi.Server for _, s := range d.ServersField { @@ -63,6 +68,7 @@ func (s Server) HasDescription() bool { } func (s Server) URL() string { + // TODO variable substitution if applies return s.URLField } diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..c4fd8d5 --- /dev/null +++ b/config/config.go @@ -0,0 +1,28 @@ +package config + +import ( + "strings" + + "github.com/asyncapi/event-gateway/kafka" +) + +// App holds the config for the whole application. +type App struct { + Debug bool + AsyncAPIDoc []byte `split_words:"true"` + KafkaProxy KafkaProxy `split_words:"true"` +} + +// ProxyConfig creates a config struct for the Kafka Proxy. +func (c App) ProxyConfig() (*kafka.ProxyConfig, error) { + return c.KafkaProxy.ProxyConfig(c.AsyncAPIDoc, c.Debug) +} + +type pipeSeparatedValues struct { + Values []string +} + +func (b *pipeSeparatedValues) Set(value string) error { + b.Values = strings.Split(value, "|") + return nil +} diff --git a/config/kafka.go b/config/kafka.go new file mode 100644 index 0000000..cb2bea1 --- /dev/null +++ b/config/kafka.go @@ -0,0 +1,110 @@ +package config + +import ( + "fmt" + "net" + "strings" + + "github.com/asyncapi/event-gateway/asyncapi" + v2 "github.com/asyncapi/event-gateway/asyncapi/v2" + "github.com/asyncapi/event-gateway/kafka" + "github.com/pkg/errors" +) + +// KafkaProxy holds the config for later configuring a Kafka proxy. +type KafkaProxy struct { + BrokerFromServer string `split_words:"true"` + BrokersMapping pipeSeparatedValues `split_words:"true"` + BrokersDialMapping pipeSeparatedValues `split_words:"true"` + ExtraFlags pipeSeparatedValues `split_words:"true"` +} + +// ProxyConfig creates a config struct for the Kafka Proxy based on a given AsyncAPI doc (if provided). +func (c *KafkaProxy) ProxyConfig(doc []byte, debug bool) (*kafka.ProxyConfig, error) { + if len(doc) == 0 && len(c.BrokersMapping.Values) == 0 { + return nil, errors.New("either AsyncAPIDoc or KafkaProxyBrokersMapping config should be provided") + } + + if c.BrokerFromServer != "" && len(doc) == 0 { + return nil, errors.New("AsyncAPIDoc should be provided when setting BrokerFromServer") + } + + var kafkaProxyConfig *kafka.ProxyConfig + var err error + if len(doc) > 0 { + kafkaProxyConfig, err = c.configFromDoc(doc) + } else { + kafkaProxyConfig, err = kafka.NewProxyConfig(c.BrokersMapping.Values, kafka.WithDialAddressMapping(c.BrokersDialMapping.Values), kafka.WithExtra(c.ExtraFlags.Values)) + } + + if err != nil { + return nil, err + } + + kafkaProxyConfig.Debug = debug + + return kafkaProxyConfig, nil +} + +func (c *KafkaProxy) configFromDoc(d []byte) (*kafka.ProxyConfig, error) { + doc := new(v2.Document) + if err := v2.Decode(d, doc); err != nil { + return nil, errors.Wrap(err, "error decoding AsyncAPI json doc to Document struct") + } + + if c.BrokerFromServer != "" { + return kafkaProxyConfigFromServer(c.BrokerFromServer, doc) + } + + return kafkaProxyConfigFromAllServers(doc.Servers()) +} + +func isValidKafkaProtocol(s asyncapi.Server) bool { + return strings.HasPrefix(s.Protocol(), "kafka") +} + +func kafkaProxyConfigFromAllServers(servers []asyncapi.Server) (*kafka.ProxyConfig, error) { + var brokersMapping []string + var dialAddressMapping []string + for _, s := range servers { + if isValidKafkaProtocol(s) { + l := s.Extension(asyncapi.ExtensionEventGatewayListener) + listenAt, ok := l.(string) + if listenAt == "" || !ok { + return nil, fmt.Errorf("please specify either %s extension, env vars or an AsyncAPI doc in orderr to set the Kafka proxy listener(s)", asyncapi.ExtensionEventGatewayListener) + } + + brokersMapping = append(brokersMapping, fmt.Sprintf("%s,%s", s.URL(), listenAt)) + + if dialMapping := s.Extension(asyncapi.ExtensionEventGatewayDialMapping); dialMapping != nil { + dialAddressMapping = append(dialAddressMapping, fmt.Sprintf("%s,%s", s.URL(), dialMapping)) + } + } + } + + return kafka.NewProxyConfig(brokersMapping, kafka.WithDialAddressMapping(dialAddressMapping)) +} + +func kafkaProxyConfigFromServer(name string, doc asyncapi.Document) (*kafka.ProxyConfig, error) { + s, ok := doc.Server(name) + if !ok { + return nil, fmt.Errorf("server %s not found in the provided AsyncAPI doc", name) + } + + if !isValidKafkaProtocol(s) { + return nil, fmt.Errorf("server %s has no kafka protocol configured but '%s'", name, s.Protocol()) + } + + // Only one broker will be configured + _, port, err := net.SplitHostPort(s.URL()) + if err != nil { + return nil, errors.Wrapf(err, "error getting port from broker %s. URL:%s", s.Name(), s.URL()) + } + + var opts []kafka.Option + if dialMapping := s.Extension(asyncapi.ExtensionEventGatewayDialMapping); dialMapping != nil { + opts = append(opts, kafka.WithDialAddressMapping([]string{fmt.Sprintf("%s,%s", s.URL(), dialMapping)})) + } + + return kafka.NewProxyConfig([]string{fmt.Sprintf("%s,:%s", s.URL(), port)}, opts...) +} diff --git a/config/kafka_test.go b/config/kafka_test.go new file mode 100644 index 0000000..d228f2f --- /dev/null +++ b/config/kafka_test.go @@ -0,0 +1,85 @@ +package config + +import ( + "testing" + + "github.com/asyncapi/event-gateway/kafka" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +//nolint:funlen +func TestKafkaProxy_ProxyConfig(t *testing.T) { + tests := []struct { + name string + config *KafkaProxy + doc []byte + expectedProxyConfig *kafka.ProxyConfig + expectedErr error + }{ + { + name: "Valid config. Only one broker from doc", + config: &KafkaProxy{ + BrokerFromServer: "test", + }, + expectedProxyConfig: &kafka.ProxyConfig{ + BrokersMapping: []string{"broker.mybrokers.org:9092,:9092"}, + }, + doc: []byte(`testdata/simple-kafka.yaml`), + }, + { + name: "Valid config. Only broker mapping", + config: &KafkaProxy{ + BrokersMapping: pipeSeparatedValues{Values: []string{"broker.mybrokers.org:9092,:9092"}}, + }, + expectedProxyConfig: &kafka.ProxyConfig{ + BrokersMapping: []string{"broker.mybrokers.org:9092,:9092"}, + }, + }, + { + name: "Valid config. Broker mapping + Dial mapping", + config: &KafkaProxy{ + BrokersMapping: pipeSeparatedValues{Values: []string{"broker.mybrokers.org:9092,:9092"}}, + BrokersDialMapping: pipeSeparatedValues{Values: []string{"broker.mybrokers.org:9092,192.168.1.10:9092"}}, + }, + expectedProxyConfig: &kafka.ProxyConfig{ + BrokersMapping: []string{"broker.mybrokers.org:9092,:9092"}, + DialAddressMapping: []string{"broker.mybrokers.org:9092,192.168.1.10:9092"}, + }, + }, + { + name: "Invalid config. No broker mapping", + config: &KafkaProxy{}, + expectedErr: errors.New("either AsyncAPIDoc or KafkaProxyBrokersMapping config should be provided"), + }, + { + name: "Invalid config. Both broker and proxy can't listen to the same port within same host", + config: &KafkaProxy{ + BrokersMapping: pipeSeparatedValues{Values: []string{"localhost:9092,:9092"}}, + }, + expectedErr: errors.New("broker and proxy can't listen to the same port on the same host. Broker is already listening at localhost:9092. Please configure a different listener port"), + }, + { + name: "Invalid config. Both broker and proxy are the same", + config: &KafkaProxy{ + BrokersMapping: pipeSeparatedValues{Values: []string{"broker.mybrokers.org:9092,broker.mybrokers.org:9092"}}, + }, + expectedErr: errors.New("broker and proxy can't listen to the same port on the same host. Broker is already listening at broker.mybrokers.org:9092. Please configure a different listener port"), + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + proxyConfig, err := test.config.ProxyConfig(test.doc, false) + if test.expectedErr != nil { + assert.EqualError(t, err, test.expectedErr.Error()) + } else { + assert.NoError(t, err) + } + + if test.expectedProxyConfig != nil { + assert.EqualValues(t, test.expectedProxyConfig, proxyConfig) + } + }) + } +} diff --git a/config/testdata/simple-kafka.yaml b/config/testdata/simple-kafka.yaml new file mode 100644 index 0000000..dcb745c --- /dev/null +++ b/config/testdata/simple-kafka.yaml @@ -0,0 +1,21 @@ +asyncapi: '2.0.0' +info: + title: Test + version: '1.0.0' +servers: + test: + url: broker.mybrokers.org:9092 + protocol: kafka +channels: + events: + publish: + operationId: onEvent + message: + name: event + payload: + type: object + properties: + id: + type: integer + minimum: 0 + description: Id of the event. \ No newline at end of file diff --git a/kafka/config.go b/kafka/config.go new file mode 100644 index 0000000..4347d4e --- /dev/null +++ b/kafka/config.go @@ -0,0 +1,97 @@ +package kafka + +import ( + "fmt" + "net" + "regexp" + "strings" + + "github.com/pkg/errors" +) + +var localHostIpv4 = regexp.MustCompile(`127\.0\.0\.\d+`) + +// ProxyConfig holds the configuration for the Kafka Proxy. +type ProxyConfig struct { + BrokersMapping []string + DialAddressMapping []string + ExtraConfig []string + Debug bool +} + +type Option func(*ProxyConfig) error + +// WithDebug enables debug. +func WithDebug() Option { + return func(c *ProxyConfig) error { + c.Debug = true + return nil + } +} + +// WithDialAddressMapping configures Dial Address Mapping. +func WithDialAddressMapping(mapping []string) Option { + return func(c *ProxyConfig) error { + c.DialAddressMapping = mapping + return nil + } +} + +// WithExtra configures extra parameters. +func WithExtra(extra []string) Option { + return func(c *ProxyConfig) error { + c.ExtraConfig = extra + return nil + } +} + +// NewProxyConfig creates a new ProxyConfig. +func NewProxyConfig(brokersMapping []string, opts ...Option) (*ProxyConfig, error) { + c := &ProxyConfig{BrokersMapping: brokersMapping} + for _, opt := range opts { + if err := opt(c); err != nil { + return nil, err + } + } + + return c, c.Validate() +} + +// Validate validates ProxyConfig. +func (c *ProxyConfig) Validate() error { + if len(c.BrokersMapping) == 0 { + return errors.New("BrokersMapping is mandatory") + } + + invalidFormatMsg := "BrokersMapping should be in form 'remotehost:remoteport,localhost:localport" + for _, m := range c.BrokersMapping { + v := strings.Split(m, ",") + if len(v) != 2 { + return errors.New(invalidFormatMsg) + } + + remoteHost, remotePort, err := net.SplitHostPort(v[0]) + if err != nil { + return errors.Wrap(err, invalidFormatMsg) + } + + localHost, localPort, err := net.SplitHostPort(v[1]) + if err != nil { + return errors.Wrap(err, invalidFormatMsg) + } + + if remoteHost == localHost && remotePort == localPort || (isLocalHost(remoteHost) && isLocalHost(localHost) && remotePort == localPort) { + return fmt.Errorf("broker and proxy can't listen to the same port on the same host. Broker is already listening at %s. Please configure a different listener port", v[0]) + } + } + + return nil +} + +func isLocalHost(host string) bool { + return host == "" || + host == "::1" || + host == "0:0:0:0:0:0:0:1" || + localHostIpv4.MatchString(host) || + host == "localhost" +} diff --git a/kafka/config_test.go b/kafka/config_test.go new file mode 100644 index 0000000..9edbf31 --- /dev/null +++ b/kafka/config_test.go @@ -0,0 +1,58 @@ +package kafka + +import ( + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +func TestProxyConfig_Validate(t *testing.T) { + tests := []struct { + name string + config ProxyConfig + expectedErr error + }{ + { + name: "Valid config. Only broker mapping", + config: ProxyConfig{ + BrokersMapping: []string{"broker.mybrokers.org:9092,:9092"}, + }, + }, + { + name: "Valid config. Broker mapping + Dial mapping", + config: ProxyConfig{ + BrokersMapping: []string{"broker.mybrokers.org:9092,:9092"}, + DialAddressMapping: []string{"broker.mybrokers.org:9092,192.168.1.10:9092"}, + }, + }, + { + name: "Invalid config. No broker mapping", + expectedErr: errors.New("BrokersMapping is mandatory"), + }, + { + name: "Invalid config. Both broker and proxy can't listen to the same port within same host", + config: ProxyConfig{ + BrokersMapping: []string{"localhost:9092,:9092"}, + }, + expectedErr: errors.New("broker and proxy can't listen to the same port on the same host. Broker is already listening at localhost:9092. Please configure a different listener port"), + }, + { + name: "Invalid config. Both broker and proxy are the same", + config: ProxyConfig{ + BrokersMapping: []string{"broker.mybrokers.org:9092,broker.mybrokers.org:9092"}, + }, + expectedErr: errors.New("broker and proxy can't listen to the same port on the same host. Broker is already listening at broker.mybrokers.org:9092. Please configure a different listener port"), + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + err := test.config.Validate() + if test.expectedErr != nil { + assert.EqualError(t, err, test.expectedErr.Error()) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/kafka/proxy.go b/kafka/proxy.go index 7b9af25..be6b6d0 100644 --- a/kafka/proxy.go +++ b/kafka/proxy.go @@ -15,16 +15,16 @@ import ( "github.com/sirupsen/logrus" ) -// ProxyConfig holds the configuration for the Kafka Proxy. -type ProxyConfig struct { - BrokersMapping []string - DialAddressMapping []string - ExtraConfig []string - Debug bool -} - // NewProxy creates a new Kafka Proxy based on a given configuration. -func NewProxy(c ProxyConfig) (proxy.Proxy, error) { +func NewProxy(c *ProxyConfig) (proxy.Proxy, error) { + if c == nil { + return nil, errors.New("config should be provided") + } + + if err := c.Validate(); err != nil { + return nil, err + } + // Yeah, not a good practice at all but I guess it's fine for now. kafkaproxy.ActualDefaultRequestHandler.RequestKeyHandlers.Set(protocol.RequestAPIKeyProduce, &requestKeyHandler{}) diff --git a/kafka/proxy_test.go b/kafka/proxy_test.go index f76db9f..f5f33e8 100644 --- a/kafka/proxy_test.go +++ b/kafka/proxy_test.go @@ -18,19 +18,23 @@ import ( func TestNewKafka(t *testing.T) { tests := []struct { name string - c ProxyConfig + c *ProxyConfig expectedErr error }{ { name: "Proxy is created when config is valid", - c: ProxyConfig{ + c: &ProxyConfig{ BrokersMapping: []string{"localhost:9092, localhost:28002"}, }, }, { name: "Proxy creation errors when config is invalid", - c: ProxyConfig{}, - expectedErr: errors.New("Brokers mapping is required"), + c: &ProxyConfig{}, + expectedErr: errors.New("BrokersMapping is mandatory"), + }, + { + name: "Proxy creation errors when config is missing", + expectedErr: errors.New("config should be provided"), }, } for _, test := range tests { diff --git a/main.go b/main.go index 690be65..c9c7c58 100644 --- a/main.go +++ b/main.go @@ -2,98 +2,34 @@ package main import ( "context" - "fmt" - "strings" - v2 "github.com/asyncapi/event-gateway/asyncapi/v2" + "github.com/asyncapi/event-gateway/config" "github.com/asyncapi/event-gateway/kafka" "github.com/kelseyhightower/envconfig" - "github.com/pkg/errors" "github.com/sirupsen/logrus" ) -type config struct { - Debug bool - AsyncAPIDoc []byte `split_words:"true"` - KafkaProxyBrokersMapping pipeSeparatedValues ` split_words:"true"` - KafkaProxyBrokersDialMapping pipeSeparatedValues `split_words:"true"` - KafkaProxyExtraFlags pipeSeparatedValues `split_words:"true"` -} - -type pipeSeparatedValues struct { - values []string -} - -func (b *pipeSeparatedValues) Set(value string) error { //nolint:unparam - b.values = strings.Split(value, "|") - return nil -} - func main() { - var c config + var c config.App if err := envconfig.Process("eventgateway", &c); err != nil { - logrus.Fatal(err) - } - - if len(c.AsyncAPIDoc) == 0 && len(c.KafkaProxyBrokersMapping.values) == 0 { - logrus.Fatalln("Either AsyncAPIDoc or KafkaProxyBrokersMapping config should be provided") + logrus.WithError(err).Fatal() } if c.Debug { logrus.SetLevel(logrus.DebugLevel) } - var kafkaProxyConfig kafka.ProxyConfig - if len(c.AsyncAPIDoc) > 0 { - conf, err := configFromDoc(c.AsyncAPIDoc) - if err != nil { - logrus.WithError(err).Fatal() - } - - kafkaProxyConfig = conf - } else { - kafkaProxyConfig = kafka.ProxyConfig{ - BrokersMapping: c.KafkaProxyBrokersMapping.values, - DialAddressMapping: c.KafkaProxyBrokersDialMapping.values, - } + kafkaProxyConfig, err := c.ProxyConfig() + if err != nil { + logrus.WithError(err).Fatal() } - kafkaProxyConfig.Debug = c.Debug - kafkaProxyConfig.ExtraConfig = c.KafkaProxyExtraFlags.values - kafkaProxy, err := kafka.NewProxy(kafkaProxyConfig) if err != nil { - logrus.Fatalln(err) + logrus.WithError(err).Fatal() } if err := kafkaProxy(context.Background()); err != nil { - logrus.Fatalln(err) - } -} - -func configFromDoc(d []byte) (kafka.ProxyConfig, error) { - var kafkaProxyConfig kafka.ProxyConfig - - doc := new(v2.Document) - if err := v2.Decode(d, doc); err != nil { - return kafkaProxyConfig, errors.Wrap(err, "error decoding AsyncAPI json doc to Document struct") + logrus.WithError(err).Fatal() } - - for _, s := range doc.Servers() { - if strings.HasPrefix(s.Protocol(), "kafka") { - listenAt := s.Extension("x-eventgateway-listener") - if listenAt == nil { - return kafkaProxyConfig, errors.New("x-eventgateway-listener extension is mandatory for opening ports to listen connections") - } - - // TODO should be only port config also accepted? - kafkaProxyConfig.BrokersMapping = append(kafkaProxyConfig.BrokersMapping, fmt.Sprintf("%s,%s", s.URL(), listenAt)) - - if dialMapping := s.Extension("x-eventgateway-dial-mapping"); dialMapping != nil { - kafkaProxyConfig.DialAddressMapping = append(kafkaProxyConfig.DialAddressMapping, fmt.Sprintf("%s,%s", s.URL(), dialMapping)) - } - } - } - - return kafkaProxyConfig, nil }