Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
feat: configure Kafka proxy from an AsyncAPI doc (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
smoya authored Jul 12, 2021
1 parent fc10977 commit e9411d1
Show file tree
Hide file tree
Showing 10 changed files with 589 additions and 10 deletions.
14 changes: 14 additions & 0 deletions asyncapi/decode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package asyncapi

// Decoder decodes an AsyncAPI document (several formats can be supported).
// See https://github.com/asyncapi/parser-go#overview for the minimum supported schemas.
type Decoder interface {
Decode([]byte, interface{}) error
}

// DecodeFunc is a helper func that implements the Decoder interface.
type DecodeFunc func([]byte, interface{}) error

func (d DecodeFunc) Decode(b []byte, dst interface{}) error {
return d(b, dst)
}
48 changes: 48 additions & 0 deletions asyncapi/document.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package asyncapi

// Document is an object representing an AsyncAPI document.
// It's API implements https://github.com/asyncapi/parser-api/blob/master/docs/v1.md.
type Document interface {
Extendable
Servers() []Server
HasServers() bool
}

// Server is an object representing a message broker, a server or any other kind of computer program capable of
// sending and/or receiving data.
type Server interface {
Extendable
Name() string
HasName() bool
Description() string
HasDescription() bool
URL() string
HasURL() bool
Protocol() string
HasProtocol() bool
Variables() []ServerVariable
}

// ServerVariable is an object representing a Server Variable for server URL template substitution.
type ServerVariable interface {
Extendable
Name() string
HasName() bool
DefaultValue() string
AllowedValues() []string // Parser API spec says any[], but AsyncAPI mentions is []string
}

// Extendable means the object can have extensions.
// The extensions properties are implemented as patterned fields that are always prefixed by "x-".
// See https://www.asyncapi.com/docs/specifications/v2.0.0#specificationExtensions.
type Extendable interface {
HasExtension(name string) bool
Extension(name string) interface{}
}

// Identifiable identifies objects. Some objects can have fields that identify themselves as unique resources.
// For example: `id` and `name` fields.
type Identifiable interface {
IDField() string
ID() string
}
68 changes: 68 additions & 0 deletions asyncapi/v2/decode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package v2

import (
"bytes"
"encoding/json"
"reflect"

"github.com/asyncapi/event-gateway/asyncapi"
"github.com/asyncapi/parser-go/pkg/parser"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)

// Decode implements the Decoder interface. Decodes AsyncAPI V2.x.x documents.
func Decode(b []byte, dst interface{}) error {
r, err := parser.NewReader(string(b)) // parser should provide another method for parsing []byte
if err != nil {
return errors.Wrap(err, "error reading AsyncAPI doc")
}

p, err := parser.New()
if err != nil {
return err
}

w := bytes.NewBuffer(nil)
if err := p(r, w); err != nil {
return errors.Wrap(err, "error parsing AsyncAPI doc")
}

raw := make(map[string]interface{})
if err := json.Unmarshal(w.Bytes(), &raw); err != nil {
return err
}

dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: setModelIdentifierHook,
Squash: true,
Result: dst,
})
if err != nil {
return err
}

return dec.Decode(raw)
}

// setModelIdentifierHook is a hook for the mapstructure decoder.
// It checks if the destination field is a map of Identifiable elements and sets the proper identifier (name, id, etc) to it.
// Example: Useful for storing the name of the server in the Server struct (AsyncAPI doc does not have such field because it assumes the name is the key of the map).
func setModelIdentifierHook(from reflect.Type, to reflect.Type, data interface{}) (interface{}, error) {
if from.Kind() != reflect.Map || to.Kind() != reflect.Map {
return data, nil
}

identifiableInterface := reflect.TypeOf((*asyncapi.Identifiable)(nil)).Elem()
if to.Key() != reflect.TypeOf("string") || !to.Elem().Implements(identifiableInterface) {
return data, nil
}

fieldName := reflect.New(to.Elem()).Interface().(asyncapi.Identifiable).IDField()
for k, v := range data.(map[string]interface{}) {
// setting the value directly in the raw map. The struct needs to keep the mapstructure field tag so it unmarshals the field.
v.(map[string]interface{})[fieldName] = k
}

return data, nil
}
88 changes: 88 additions & 0 deletions asyncapi/v2/decode_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package v2

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDecodeFromFile(t *testing.T) {
doc := new(Document)
require.NoError(t, Decode([]byte("testdata/example-kafka.yaml"), doc))

require.Len(t, doc.Servers(), 1)
s := doc.Servers()[0]

assert.True(t, s.HasName())
assert.Equal(t, "test", s.Name())
assert.True(t, s.HasDescription())
assert.Equal(t, "Test broker", s.Description())
assert.True(t, s.HasProtocol())
assert.Equal(t, "kafka-secure", s.Protocol())
assert.True(t, s.HasURL())
assert.Equal(t, "localhost:9092", s.URL())
assert.True(t, s.HasExtension("x-eventgateway-listener"))
assert.Equal(t, "proxy:28002", s.Extension("x-eventgateway-listener"))
assert.True(t, s.HasExtension("x-eventgateway-dial-mapping"))
assert.Equal(t, "broker:9092", s.Extension("x-eventgateway-dial-mapping"))
assert.Empty(t, s.Variables())
}

//nolint:misspell
func TestDecodeFromPlainText(t *testing.T) {
raw := []byte(`
asyncapi: '2.0.0'
info:
title: Streetlights API
version: '1.0.0'
description: |
The Smartylighting Streetlights API allows you
to remotely manage the city lights.
license:
name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'
servers:
mosquitto:
url: mqtt://test.mosquitto.org
protocol: mqtt
channels:
light/measured:
publish:
summary: Inform about environmental lighting conditions for a particular streetlight.
operationId: onLightMeasured
message:
name: LightMeasured
payload:
type: object
properties:
id:
type: integer
minimum: 0
description: Id of the streetlight.
lumens:
type: integer
minimum: 0
description: Light intensity measured in lumens.
sentAt:
type: string
format: date-time
description: Date and time when the message was sent.`)

doc := new(Document)
require.NoError(t, Decode(raw, doc))

require.Len(t, doc.Servers(), 1)
s := doc.Servers()[0]

assert.True(t, s.HasName())
assert.Equal(t, "mosquitto", s.Name())
assert.False(t, s.HasDescription())
assert.True(t, s.HasProtocol())
assert.Equal(t, "mqtt", s.Protocol())
assert.True(t, s.HasURL())
assert.Equal(t, "mqtt://test.mosquitto.org", s.URL())
assert.False(t, s.HasExtension("x-eventgateway-listener"))
assert.False(t, s.HasExtension("x-eventgateway-dial-mapping"))
assert.Empty(t, s.Variables())
}
163 changes: 163 additions & 0 deletions asyncapi/v2/testdata/example-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
asyncapi: '2.0.0'
info:
title: Streetlights Kafka API
version: '1.0.0'
description: |
The Smartylighting Streetlights API allows you to remotely manage the city lights.
### Check out its awesome features:
* Turn a specific streetlight on/off 🌃
* Dim a specific streetlight 😎
* Receive real-time information about environmental lighting conditions 📈
license:
name: Apache 2.0
url: https://www.apache.org/licenses/LICENSE-2.0

servers:
test:
url: localhost:9092
protocol: kafka-secure
description: Test broker
security:
- saslScram: []
x-eventgateway-listener: proxy:28002
x-eventgateway-dial-mapping: broker:9092 # optional.

defaultContentType: application/json

channels:
smartylighting.streetlights.1.0.event.{streetlightId}.lighting.measured:
description: The topic on which measured values may be produced and consumed.
parameters:
streetlightId:
$ref: '#/components/parameters/streetlightId'
publish:
summary: Inform about environmental lighting conditions of a particular streetlight.
operationId: receiveLightMeasurement
traits:
- $ref: '#/components/operationTraits/kafka'
message:
$ref: '#/components/messages/lightMeasured'

smartylighting.streetlights.1.0.action.{streetlightId}.turn.on:
parameters:
streetlightId:
$ref: '#/components/parameters/streetlightId'
subscribe:
operationId: turnOn
traits:
- $ref: '#/components/operationTraits/kafka'
message:
$ref: '#/components/messages/turnOnOff'

smartylighting.streetlights.1.0.action.{streetlightId}.turn.off:
parameters:
streetlightId:
$ref: '#/components/parameters/streetlightId'
subscribe:
operationId: turnOff
traits:
- $ref: '#/components/operationTraits/kafka'
message:
$ref: '#/components/messages/turnOnOff'

smartylighting.streetlights.1.0.action.{streetlightId}.dim:
parameters:
streetlightId:
$ref: '#/components/parameters/streetlightId'
subscribe:
operationId: dimLight
traits:
- $ref: '#/components/operationTraits/kafka'
message:
$ref: '#/components/messages/dimLight'

components:
messages:
lightMeasured:
name: lightMeasured
title: Light measured
summary: Inform about environmental lighting conditions of a particular streetlight.
contentType: application/json
traits:
- $ref: '#/components/messageTraits/commonHeaders'
payload:
$ref: "#/components/schemas/lightMeasuredPayload"
turnOnOff:
name: turnOnOff
title: Turn on/off
summary: Command a particular streetlight to turn the lights on or off.
traits:
- $ref: '#/components/messageTraits/commonHeaders'
payload:
$ref: "#/components/schemas/turnOnOffPayload"
dimLight:
name: dimLight
title: Dim light
summary: Command a particular streetlight to dim the lights.
traits:
- $ref: '#/components/messageTraits/commonHeaders'
payload:
$ref: "#/components/schemas/dimLightPayload"

schemas:
lightMeasuredPayload:
type: object
properties:
lumens:
type: integer
minimum: 0
description: Light intensity measured in lumens.
sentAt:
$ref: "#/components/schemas/sentAt"
turnOnOffPayload:
type: object
properties:
command:
type: string
enum:
- on
- off
description: Whether to turn on or off the light.
sentAt:
$ref: "#/components/schemas/sentAt"
dimLightPayload:
type: object
properties:
percentage:
type: integer
description: Percentage to which the light should be dimmed to.
minimum: 0
maximum: 100
sentAt:
$ref: "#/components/schemas/sentAt"
sentAt:
type: string
format: date-time
description: Date and time when the message was sent.

securitySchemes:
saslScram:
type: userPassword
description: Provide your username and password for SASL/SCRAM authentication

parameters:
streetlightId:
description: The ID of the streetlight.
schema:
type: string

messageTraits:
commonHeaders:
headers:
type: object
properties:
my-app-header:
type: integer
minimum: 0
maximum: 100

operationTraits:
kafka:
bindings:
kafka:
clientId: my-app-id
Loading

0 comments on commit e9411d1

Please sign in to comment.