Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
feat(config): allow configuring single listener from AsyncAPI server (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
smoya authored Jul 13, 2021
1 parent e9411d1 commit 28b67d6
Show file tree
Hide file tree
Showing 13 changed files with 451 additions and 93 deletions.
1 change: 1 addition & 0 deletions asyncapi/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package asyncapi
// It's API implements https://github.com/asyncapi/parser-api/blob/master/docs/v1.md.
type Document interface {
Extendable
Server(name string) (Server, bool)
Servers() []Server
HasServers() bool
}
Expand Down
7 changes: 7 additions & 0 deletions asyncapi/extension.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package asyncapi

// AsyncAPI doc extensions for the Event Gateway.
const (
ExtensionEventGatewayListener = "x-eventgateway-listener"
ExtensionEventGatewayDialMapping = "x-eventgateway-dial-mapping"
)
21 changes: 13 additions & 8 deletions asyncapi/v2/decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v2
import (
"testing"

"github.com/asyncapi/event-gateway/asyncapi"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -12,7 +13,9 @@ func TestDecodeFromFile(t *testing.T) {
require.NoError(t, Decode([]byte("testdata/example-kafka.yaml"), doc))

require.Len(t, doc.Servers(), 1)
s := doc.Servers()[0]
s, ok := doc.Server("test")
assert.True(t, ok)
assert.Equal(t, s, doc.Servers()[0])

assert.True(t, s.HasName())
assert.Equal(t, "test", s.Name())
Expand All @@ -22,10 +25,10 @@ func TestDecodeFromFile(t *testing.T) {
assert.Equal(t, "kafka-secure", s.Protocol())
assert.True(t, s.HasURL())
assert.Equal(t, "localhost:9092", s.URL())
assert.True(t, s.HasExtension("x-eventgateway-listener"))
assert.Equal(t, "proxy:28002", s.Extension("x-eventgateway-listener"))
assert.True(t, s.HasExtension("x-eventgateway-dial-mapping"))
assert.Equal(t, "broker:9092", s.Extension("x-eventgateway-dial-mapping"))
assert.True(t, s.HasExtension(asyncapi.ExtensionEventGatewayListener))
assert.Equal(t, "proxy:28002", s.Extension(asyncapi.ExtensionEventGatewayListener))
assert.True(t, s.HasExtension(asyncapi.ExtensionEventGatewayDialMapping))
assert.Equal(t, "broker:9092", s.Extension(asyncapi.ExtensionEventGatewayDialMapping))
assert.Empty(t, s.Variables())
}

Expand Down Expand Up @@ -73,7 +76,9 @@ channels:
require.NoError(t, Decode(raw, doc))

require.Len(t, doc.Servers(), 1)
s := doc.Servers()[0]
s, ok := doc.Server("mosquitto")
assert.True(t, ok)
assert.Equal(t, s, doc.Servers()[0])

assert.True(t, s.HasName())
assert.Equal(t, "mosquitto", s.Name())
Expand All @@ -82,7 +87,7 @@ channels:
assert.Equal(t, "mqtt", s.Protocol())
assert.True(t, s.HasURL())
assert.Equal(t, "mqtt://test.mosquitto.org", s.URL())
assert.False(t, s.HasExtension("x-eventgateway-listener"))
assert.False(t, s.HasExtension("x-eventgateway-dial-mapping"))
assert.False(t, s.HasExtension(asyncapi.ExtensionEventGatewayListener))
assert.False(t, s.HasExtension(asyncapi.ExtensionEventGatewayDialMapping))
assert.Empty(t, s.Variables())
}
6 changes: 6 additions & 0 deletions asyncapi/v2/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ type Document struct {
ServersField map[string]Server `mapstructure:"servers"`
}

func (d Document) Server(name string) (asyncapi.Server, bool) {
s, ok := d.ServersField[name]
return s, ok
}

func (d Document) Servers() []asyncapi.Server {
var servers []asyncapi.Server
for _, s := range d.ServersField {
Expand Down Expand Up @@ -63,6 +68,7 @@ func (s Server) HasDescription() bool {
}

func (s Server) URL() string {
// TODO variable substitution if applies
return s.URLField
}

Expand Down
28 changes: 28 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package config

import (
"strings"

"github.com/asyncapi/event-gateway/kafka"
)

// App holds the config for the whole application.
type App struct {
Debug bool
AsyncAPIDoc []byte `split_words:"true"`
KafkaProxy KafkaProxy `split_words:"true"`
}

// ProxyConfig creates a config struct for the Kafka Proxy.
func (c App) ProxyConfig() (*kafka.ProxyConfig, error) {
return c.KafkaProxy.ProxyConfig(c.AsyncAPIDoc, c.Debug)
}

type pipeSeparatedValues struct {
Values []string
}

func (b *pipeSeparatedValues) Set(value string) error {
b.Values = strings.Split(value, "|")
return nil
}
110 changes: 110 additions & 0 deletions config/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package config

import (
"fmt"
"net"
"strings"

"github.com/asyncapi/event-gateway/asyncapi"
v2 "github.com/asyncapi/event-gateway/asyncapi/v2"
"github.com/asyncapi/event-gateway/kafka"
"github.com/pkg/errors"
)

// KafkaProxy holds the config for later configuring a Kafka proxy.
type KafkaProxy struct {
BrokerFromServer string `split_words:"true"`
BrokersMapping pipeSeparatedValues `split_words:"true"`
BrokersDialMapping pipeSeparatedValues `split_words:"true"`
ExtraFlags pipeSeparatedValues `split_words:"true"`
}

// ProxyConfig creates a config struct for the Kafka Proxy based on a given AsyncAPI doc (if provided).
func (c *KafkaProxy) ProxyConfig(doc []byte, debug bool) (*kafka.ProxyConfig, error) {
if len(doc) == 0 && len(c.BrokersMapping.Values) == 0 {
return nil, errors.New("either AsyncAPIDoc or KafkaProxyBrokersMapping config should be provided")
}

if c.BrokerFromServer != "" && len(doc) == 0 {
return nil, errors.New("AsyncAPIDoc should be provided when setting BrokerFromServer")
}

var kafkaProxyConfig *kafka.ProxyConfig
var err error
if len(doc) > 0 {
kafkaProxyConfig, err = c.configFromDoc(doc)
} else {
kafkaProxyConfig, err = kafka.NewProxyConfig(c.BrokersMapping.Values, kafka.WithDialAddressMapping(c.BrokersDialMapping.Values), kafka.WithExtra(c.ExtraFlags.Values))
}

if err != nil {
return nil, err
}

kafkaProxyConfig.Debug = debug

return kafkaProxyConfig, nil
}

func (c *KafkaProxy) configFromDoc(d []byte) (*kafka.ProxyConfig, error) {
doc := new(v2.Document)
if err := v2.Decode(d, doc); err != nil {
return nil, errors.Wrap(err, "error decoding AsyncAPI json doc to Document struct")
}

if c.BrokerFromServer != "" {
return kafkaProxyConfigFromServer(c.BrokerFromServer, doc)
}

return kafkaProxyConfigFromAllServers(doc.Servers())
}

func isValidKafkaProtocol(s asyncapi.Server) bool {
return strings.HasPrefix(s.Protocol(), "kafka")
}

func kafkaProxyConfigFromAllServers(servers []asyncapi.Server) (*kafka.ProxyConfig, error) {
var brokersMapping []string
var dialAddressMapping []string
for _, s := range servers {
if isValidKafkaProtocol(s) {
l := s.Extension(asyncapi.ExtensionEventGatewayListener)
listenAt, ok := l.(string)
if listenAt == "" || !ok {
return nil, fmt.Errorf("please specify either %s extension, env vars or an AsyncAPI doc in orderr to set the Kafka proxy listener(s)", asyncapi.ExtensionEventGatewayListener)
}

brokersMapping = append(brokersMapping, fmt.Sprintf("%s,%s", s.URL(), listenAt))

if dialMapping := s.Extension(asyncapi.ExtensionEventGatewayDialMapping); dialMapping != nil {
dialAddressMapping = append(dialAddressMapping, fmt.Sprintf("%s,%s", s.URL(), dialMapping))
}
}
}

return kafka.NewProxyConfig(brokersMapping, kafka.WithDialAddressMapping(dialAddressMapping))
}

func kafkaProxyConfigFromServer(name string, doc asyncapi.Document) (*kafka.ProxyConfig, error) {
s, ok := doc.Server(name)
if !ok {
return nil, fmt.Errorf("server %s not found in the provided AsyncAPI doc", name)
}

if !isValidKafkaProtocol(s) {
return nil, fmt.Errorf("server %s has no kafka protocol configured but '%s'", name, s.Protocol())
}

// Only one broker will be configured
_, port, err := net.SplitHostPort(s.URL())
if err != nil {
return nil, errors.Wrapf(err, "error getting port from broker %s. URL:%s", s.Name(), s.URL())
}

var opts []kafka.Option
if dialMapping := s.Extension(asyncapi.ExtensionEventGatewayDialMapping); dialMapping != nil {
opts = append(opts, kafka.WithDialAddressMapping([]string{fmt.Sprintf("%s,%s", s.URL(), dialMapping)}))
}

return kafka.NewProxyConfig([]string{fmt.Sprintf("%s,:%s", s.URL(), port)}, opts...)
}
85 changes: 85 additions & 0 deletions config/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package config

import (
"testing"

"github.com/asyncapi/event-gateway/kafka"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

//nolint:funlen
func TestKafkaProxy_ProxyConfig(t *testing.T) {
tests := []struct {
name string
config *KafkaProxy
doc []byte
expectedProxyConfig *kafka.ProxyConfig
expectedErr error
}{
{
name: "Valid config. Only one broker from doc",
config: &KafkaProxy{
BrokerFromServer: "test",
},
expectedProxyConfig: &kafka.ProxyConfig{
BrokersMapping: []string{"broker.mybrokers.org:9092,:9092"},
},
doc: []byte(`testdata/simple-kafka.yaml`),
},
{
name: "Valid config. Only broker mapping",
config: &KafkaProxy{
BrokersMapping: pipeSeparatedValues{Values: []string{"broker.mybrokers.org:9092,:9092"}},
},
expectedProxyConfig: &kafka.ProxyConfig{
BrokersMapping: []string{"broker.mybrokers.org:9092,:9092"},
},
},
{
name: "Valid config. Broker mapping + Dial mapping",
config: &KafkaProxy{
BrokersMapping: pipeSeparatedValues{Values: []string{"broker.mybrokers.org:9092,:9092"}},
BrokersDialMapping: pipeSeparatedValues{Values: []string{"broker.mybrokers.org:9092,192.168.1.10:9092"}},
},
expectedProxyConfig: &kafka.ProxyConfig{
BrokersMapping: []string{"broker.mybrokers.org:9092,:9092"},
DialAddressMapping: []string{"broker.mybrokers.org:9092,192.168.1.10:9092"},
},
},
{
name: "Invalid config. No broker mapping",
config: &KafkaProxy{},
expectedErr: errors.New("either AsyncAPIDoc or KafkaProxyBrokersMapping config should be provided"),
},
{
name: "Invalid config. Both broker and proxy can't listen to the same port within same host",
config: &KafkaProxy{
BrokersMapping: pipeSeparatedValues{Values: []string{"localhost:9092,:9092"}},
},
expectedErr: errors.New("broker and proxy can't listen to the same port on the same host. Broker is already listening at localhost:9092. Please configure a different listener port"),
},
{
name: "Invalid config. Both broker and proxy are the same",
config: &KafkaProxy{
BrokersMapping: pipeSeparatedValues{Values: []string{"broker.mybrokers.org:9092,broker.mybrokers.org:9092"}},
},
expectedErr: errors.New("broker and proxy can't listen to the same port on the same host. Broker is already listening at broker.mybrokers.org:9092. Please configure a different listener port"),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
proxyConfig, err := test.config.ProxyConfig(test.doc, false)
if test.expectedErr != nil {
assert.EqualError(t, err, test.expectedErr.Error())
} else {
assert.NoError(t, err)
}

if test.expectedProxyConfig != nil {
assert.EqualValues(t, test.expectedProxyConfig, proxyConfig)
}
})
}
}
21 changes: 21 additions & 0 deletions config/testdata/simple-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
asyncapi: '2.0.0'
info:
title: Test
version: '1.0.0'
servers:
test:
url: broker.mybrokers.org:9092
protocol: kafka
channels:
events:
publish:
operationId: onEvent
message:
name: event
payload:
type: object
properties:
id:
type: integer
minimum: 0
description: Id of the event.
Loading

0 comments on commit 28b67d6

Please sign in to comment.