Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

feat: configure extra message route to publish validationErrors to Kafka topic #61

Merged
merged 4 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 35 additions & 33 deletions asyncapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,16 @@ channels:
- asyncapi-event-gateway-demo-validation
subscribe:
message:
$ref: '#/components/messages/ValidationError'
$ref: '#/components/messages/invalidMessage'
event-gateway-demo-validation:
description: Validation errors are published to and consumed from it. AsyncAPI Event-gateway is the only user of this channel. It can be consumed and exposed via `event-gateway-demo-validation-events` channel (`asyncapi-event-gateway-demo-validation` ws server).
x-servers: # Based on https://github.com/asyncapi/spec/pull/531
- asyncapi-kafka-test
subscribe:
message:
$ref: '#/components/messages/ValidationError'
$ref: '#/components/messages/invalidMessage'
components:
messages:
ValidationError:
payload:
type: object
properties:
ts:
type: string
description: RFC-3339 date-time. Date and time when the message was validated.
errors:
type: array
description: Array of string. Validation errors.
msg:
$ref: '#/components/schemas/Message'
examples:
- payload:
ts: '2021-09-10T12:04:18:475203609Z'
errors: [ 'lumens: Invalid type. Expected: integer, given: string' ]
msg:
context:
channel: 'event-gateway-demo'
key: 'YXN5bmNhcGktd2FzLWhlcmU='
value: 'eyJsdW1lbnMiOiAid2hhdGV2ZXIifQ=='
# lightMeasured is copied from the Streetlights tutorial instead of using references due to a bug in parser-go: https://github.com/asyncapi/parser-go/issues/82
lightMeasured:
name: lightMeasured
Expand All @@ -73,24 +52,47 @@ components:
- $ref: '#/components/messageTraits/commonHeaders'
payload:
$ref: "#/components/schemas/lightMeasuredPayload"
invalidMessage:
title: Invalid message
summary: Message with a Validation Error.
contentType: application/json
payload:
$ref: "#/components/schemas/sentMessage"
headers:
type: object
properties:
_asyncapi_eg_channel:
type: string
description: AsyncAPI Channel where the message was published to.
_asyncapi_eg_validation_error:
$ref: '#/components/schemas/validationError'
schemas:
Message:
sentMessage:
type: object
properties:
key:
UUID:
type: string
description: Kafka message key (base64).
value:
description: Unique identifier of message. I.e. Kafka message key.
Payload:
type: string
description: Kafka message value (base64).
context:
$ref: '#/components/schemas/MessageContext'
MessageContext:
description: Message value. I.e. Kafka message (base64).
examples:
- UUID: 'YXN5bmNhcGktd2FzLWhlcmU='
Payload: 'eyJsdW1lbnMiOiAid2hhdGV2ZXIifQ=='
validationError:
type: object
properties:
channel:
ts:
type: string
description: Name of the channel the message was published to.
description: RFC-3339 date-time. Date and time when the message was validated.
errors:
type: array
description: Array of string. Validation errors.
items:
type: string
examples:
- ts: '2021-09-10T12:04:18:475203609Z'
errors: [ 'lumens: Invalid type. Expected: integer, given: string' ]
lightMeasuredPayload:
type: object
properties:
Expand Down
105 changes: 82 additions & 23 deletions config/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,28 @@ import (
"net"
"strings"

watermillkafka "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/asyncapi/event-gateway/asyncapi"
v2 "github.com/asyncapi/event-gateway/asyncapi/v2"
"github.com/asyncapi/event-gateway/kafka"
"github.com/asyncapi/event-gateway/message"
"github.com/asyncapi/event-gateway/message/handler"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// KafkaProxy holds the config for later configuring a Kafka proxy.
type KafkaProxy struct {
BrokerFromServer string `split_words:"true" desc:"When configuring from an AsyncAPI doc, this allows the user to only configure one server instead of all"`
MessageValidation MessageValidation `split_words:"true" desc:""`
BrokerFromServer string `split_words:"true" desc:"When configuring from an AsyncAPI doc, this allows the user to only configure one server instead of all"`
MessageValidation MessageValidation `split_words:"true"`
TLS *kafka.TLSConfig
ExtraFlags pipeSeparatedValues `split_words:"true" desc:"Advanced configuration. Configure any flag from https://github.com/grepplabs/kafka-proxy/blob/4f3b89fbaecb3eb82426f5dcff5f76188ea9a9dc/cmd/kafka-proxy/server.go#L85-L195. Multiple values can be configured by using pipe separation (|)"`
}

// MessageValidation holds the config about message validation.
type MessageValidation struct {
Enabled bool `default:"true" desc:"Enable or disable validation of Kafka messages"`
Notifier message.ValidationErrorNotifier `envconfig:"-"`
}

// NotifyValidationErrorOnChan sets a channel as ValidationError notifier.
func NotifyValidationErrorOnChan(errChan chan *message.ValidationError) Opt {
return func(app *App) {
app.KafkaProxy.MessageValidation.Notifier = message.ValidationErrorToChanNotifier(errChan)
}
Enabled bool `default:"true" desc:"Enable or disable validation of Kafka messages"`
PublishToKafkaTopic string `split_words:"true"`
}

// NewKafkaProxy creates a KafkaProxy with defaults.
Expand All @@ -51,16 +47,6 @@ func (c *KafkaProxy) ProxyConfig(d []byte, debug bool) (*kafka.ProxyConfig, erro
return nil, errors.Wrap(err, "error decoding AsyncAPI json doc to Document struct")
}

opts := []kafka.Option{kafka.WithExtra(c.ExtraFlags.Values), kafka.WithDebug(debug)}
if c.MessageValidation.Enabled {
validator, err := v2.FromDocJSONSchemaMessageValidator(doc)
if err != nil {
return nil, errors.Wrap(err, "error creating message validator")
}

opts = append(opts, kafka.WithMessageHandler(handler.ValidateMessage(validator, c.MessageValidation.Notifier, false)))
}

servers := doc.Servers()
if c.BrokerFromServer != "" {
// Pick up only the specified server
Expand All @@ -76,14 +62,87 @@ func (c *KafkaProxy) ProxyConfig(d []byte, debug bool) (*kafka.ProxyConfig, erro
servers = []asyncapi.Server{s}
}

return kafkaProxyConfigFromServers(servers, opts...)
opts := []kafka.ProxyOption{kafka.WithExtra(c.ExtraFlags.Values), kafka.WithDebug(debug)}
if c.MessageValidation.Enabled {
messageValidationOpts, err := c.generateMessageValidatorOptions(doc, servers)
if err != nil {
return nil, errors.Wrap(err, "error configuring message validation")
}

opts = append(opts, messageValidationOpts...)
}

conf, err := kafkaProxyConfigFromServers(servers, opts...)
if err != nil {
return nil, err
}
conf.TLS = c.TLS

return conf, nil
}

func (c *KafkaProxy) generateMessageValidatorOptions(doc asyncapi.Document, servers []asyncapi.Server) ([]kafka.ProxyOption, error) {
validator, err := v2.FromDocJSONSchemaMessageValidator(doc)
if err != nil {
return nil, errors.Wrap(err, "error creating message validator")
}

opts := []kafka.ProxyOption{kafka.WithMessageHandler(handler.ValidateMessage(validator, false))}

if c.MessageValidation.PublishToKafkaTopic == "" {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the topic is set (from env var atm), then all the re-routing to publish the messages back into Kafka (but another topic) is configured.

logrus.Warn("No topic set for invalid messages. Invalid messages will be discarded")
return opts, nil
}

// Configure Kafka Producer
saramaConf := watermillkafka.DefaultSaramaSyncPublisherConfig()
if c.TLS != nil && c.TLS.Enable {
tlsConfig, err := c.TLS.Config()
if err != nil {
return opts, fmt.Errorf("tls config is invalid. %w", err)
}

saramaConf.Net.TLS.Enable = true
saramaConf.Net.TLS.Config = tlsConfig
}

brokers := make([]string, len(servers))
for i := 0; i < len(servers); i++ {
brokers[i] = servers[i].URL()
}
marshaler := watermillkafka.DefaultMarshaler{}
publisherConf := watermillkafka.PublisherConfig{
Brokers: brokers,
Marshaler: marshaler,
OverwriteSaramaConfig: saramaConf,
}
logger := message.NewWatermillLogrusLogger(logrus.StandardLogger())
publisher, err := watermillkafka.NewPublisher(publisherConf, logger)
if err != nil {
return opts, err
}

subscriberConf := watermillkafka.SubscriberConfig{
Brokers: brokers,
OverwriteSaramaConfig: saramaConf,
Unmarshaler: marshaler,
}

subscriber, err := watermillkafka.NewSubscriber(subscriberConf, logger)
if err != nil {
return opts, err
}

opts = append(opts, kafka.WithMessagePublisher(publisher, c.MessageValidation.PublishToKafkaTopic), kafka.WithMessageSubscriber(subscriber))

return opts, nil
}

func isValidKafkaProtocol(s asyncapi.Server) bool {
return strings.HasPrefix(s.Protocol(), "kafka")
}

func kafkaProxyConfigFromServers(servers []asyncapi.Server, opts ...kafka.Option) (*kafka.ProxyConfig, error) {
func kafkaProxyConfigFromServers(servers []asyncapi.Server, opts ...kafka.ProxyOption) (*kafka.ProxyConfig, error) {
brokersMapping, dialAddressMapping, err := extractAddressMappingFromServers(servers...)
if err != nil {
return nil, err
Expand Down
81 changes: 68 additions & 13 deletions kafka/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package kafka

import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"regexp"
"strings"
Expand All @@ -20,55 +23,104 @@ type ProxyConfig struct {
DialAddressMapping []string
ExtraConfig []string
MessageHandler watermillmessage.HandlerFunc
MessageMiddlewares []watermillmessage.HandlerMiddleware
MessagePublisher watermillmessage.Publisher
PublishToTopic string
MessageSubscriber watermillmessage.Subscriber
TLS *TLSConfig
Debug bool
}

// Option represents a functional configuration for the Proxy.
type Option func(*ProxyConfig) error
// TLSConfig holds configuration for TLS.
type TLSConfig struct {
Enable bool
InsecureSkipVerify bool `split_words:"true"`
ClientCertFile string `split_words:"true"`
ClientKeyFile string `split_words:"true"`
CAChainCertFile string `split_words:"true"`
}

// WithMessageMiddlewares ...
func WithMessageMiddlewares(middlewares ...watermillmessage.HandlerMiddleware) Option {
return func(c *ProxyConfig) error {
c.MessageMiddlewares = append(c.MessageMiddlewares, middlewares...)
return nil
// Config returns a *tls.Config based on current config.
func (c *TLSConfig) Config() (*tls.Config, error) {
cfg := &tls.Config{InsecureSkipVerify: c.InsecureSkipVerify} //nolint:gosec

if c.ClientCertFile != "" && c.ClientKeyFile != "" {
cert, err := tls.LoadX509KeyPair(c.ClientCertFile, c.ClientKeyFile)
if err != nil {
return nil, err
}
cfg.Certificates = []tls.Certificate{cert}
}

if c.CAChainCertFile != "" {
caCertPEMBlock, err := ioutil.ReadFile(c.CAChainCertFile)
if err != nil {
return nil, err
}
rootCAs := x509.NewCertPool()
if ok := rootCAs.AppendCertsFromPEM(caCertPEMBlock); !ok {
return nil, errors.New("Failed to parse client root certificate")
}

cfg.RootCAs = rootCAs
}

return cfg, nil
}

// ProxyOption represents a functional configuration for the Proxy.
type ProxyOption func(*ProxyConfig) error

// WithMessageHandler ...
func WithMessageHandler(handler watermillmessage.HandlerFunc) Option {
func WithMessageHandler(handler watermillmessage.HandlerFunc) ProxyOption {
return func(c *ProxyConfig) error {
c.MessageHandler = handler
return nil
}
}

// WithMessagePublisher configures a publisher where the messages will be published after being handled.
func WithMessagePublisher(publisher watermillmessage.Publisher, topic string) ProxyOption {
return func(c *ProxyConfig) error {
c.MessagePublisher = publisher
c.PublishToTopic = topic
return nil
}
}

// WithMessageSubscriber configures a subscriber subscribed to the messages published by the configured c.MessagePublisher.
func WithMessageSubscriber(subscriber watermillmessage.Subscriber) ProxyOption {
return func(c *ProxyConfig) error {
c.MessageSubscriber = subscriber
return nil
}
}

// WithDebug enables/disables debug.
func WithDebug(enabled bool) Option {
func WithDebug(enabled bool) ProxyOption {
return func(c *ProxyConfig) error {
c.Debug = enabled
return nil
}
}

// WithDialAddressMapping configures Dial Address Mapping.
func WithDialAddressMapping(mapping []string) Option {
func WithDialAddressMapping(mapping []string) ProxyOption {
return func(c *ProxyConfig) error {
c.DialAddressMapping = mapping
return nil
}
}

// WithExtra configures extra parameters.
func WithExtra(extra []string) Option {
func WithExtra(extra []string) ProxyOption {
return func(c *ProxyConfig) error {
c.ExtraConfig = extra
return nil
}
}

// NewProxyConfig creates a new ProxyConfig.
func NewProxyConfig(brokersMapping []string, opts ...Option) (*ProxyConfig, error) {
func NewProxyConfig(brokersMapping []string, opts ...ProxyOption) (*ProxyConfig, error) {
c := &ProxyConfig{BrokersMapping: brokersMapping}
for _, opt := range opts {
if err := opt(c); err != nil {
Expand Down Expand Up @@ -109,6 +161,9 @@ func (c *ProxyConfig) Validate() error {

if c.MessageHandler == nil {
logrus.Warn("There is no message handler configured")
return nil
} else if (c.MessagePublisher != nil && c.PublishToTopic == "") || (c.MessagePublisher == nil && c.PublishToTopic != "") {
return fmt.Errorf("MessagePublisher and PublishToTopic should be set together")
}

return nil
Expand Down
Loading