From 9e3bb7c4bf8f173266d2ec2a68bb963483b6d568 Mon Sep 17 00:00:00 2001 From: Sergio Moya <1083296+smoya@users.noreply.github.com> Date: Mon, 11 Oct 2021 23:53:19 +0200 Subject: [PATCH 1/4] feat: configure extra message route to publish validationErrors to Kafka topic --- asyncapi.yaml | 68 +++++++++--------- config/kafka.go | 105 +++++++++++++++++++++------ kafka/config.go | 81 +++++++++++++++++---- kafka/config_test.go | 19 +++++ kafka/proxy.go | 49 ++++++------- kafka/proxy_test.go | 90 +++++++++++++++-------- main.go | 95 ++++++++++++++++--------- message/handler/validate.go | 27 +++---- message/handler/validate_test.go | 51 ++++++------- message/message.go | 8 --- message/metadata.go | 28 ++++++++ message/test/utils.go | 118 +++++++++++++++++++++++++++++++ message/validation.go | 39 +++++----- message/validation_test.go | 11 +++ 14 files changed, 570 insertions(+), 219 deletions(-) create mode 100644 message/metadata.go create mode 100644 message/test/utils.go diff --git a/asyncapi.yaml b/asyncapi.yaml index cf8d9ff..ed90dfe 100644 --- a/asyncapi.yaml +++ b/asyncapi.yaml @@ -32,37 +32,16 @@ channels: - asyncapi-event-gateway-demo-validation subscribe: message: - $ref: '#/components/messages/ValidationError' + $ref: '#/components/messages/invalidMessage' event-gateway-demo-validation: description: Validation errors are published to and consumed from it. AsyncAPI Event-gateway is the only user of this channel. It can be consumed and exposed via `event-gateway-demo-validation-events` channel (`asyncapi-event-gateway-demo-validation` ws server). x-servers: # Based on https://github.com/asyncapi/spec/pull/531 - asyncapi-kafka-test subscribe: message: - $ref: '#/components/messages/ValidationError' + $ref: '#/components/messages/invalidMessage' components: messages: - ValidationError: - payload: - type: object - properties: - ts: - type: string - description: RFC-3339 date-time. Date and time when the message was validated. - errors: - type: array - description: Array of string. Validation errors. - msg: - $ref: '#/components/schemas/Message' - examples: - - payload: - ts: '2021-09-10T12:04:18:475203609Z' - errors: [ 'lumens: Invalid type. Expected: integer, given: string' ] - msg: - context: - channel: 'event-gateway-demo' - key: 'YXN5bmNhcGktd2FzLWhlcmU=' - value: 'eyJsdW1lbnMiOiAid2hhdGV2ZXIifQ==' # lightMeasured is copied from the Streetlights tutorial instead of using references due to a bug in parser-go: https://github.com/asyncapi/parser-go/issues/82 lightMeasured: name: lightMeasured @@ -73,24 +52,47 @@ components: - $ref: '#/components/messageTraits/commonHeaders' payload: $ref: "#/components/schemas/lightMeasuredPayload" + invalidMessage: + title: Invalid message + summary: Message with a Validation Error. + contentType: application/json + payload: + $ref: "#/components/schemas/message" + headers: + type: object + properties: + _asyncapi_eg_channel: + type: string + description: AsyncAPI Channel where the message was published to. + _asyncapi_eg_validation_error: + $ref: '#/components/schemas/ValidationError' schemas: - Message: + message: type: object properties: - key: + UUID: type: string - description: Kafka message key (base64). - value: + description: Unique identifier of message. I.e. Kafka message key. + Payload: type: string - description: Kafka message value (base64). - context: - $ref: '#/components/schemas/MessageContext' - MessageContext: + description: Message value. I.e. Kafka message (base64). + examples: + - UUID: 'YXN5bmNhcGktd2FzLWhlcmU=' + Payload: 'eyJsdW1lbnMiOiAid2hhdGV2ZXIifQ==' + ValidationError: type: object properties: - channel: + ts: type: string - description: Name of the channel the message was published to. + description: RFC-3339 date-time. Date and time when the message was validated. + errors: + type: array + description: Array of string. Validation errors. + items: + type: string + examples: + - ts: '2021-09-10T12:04:18:475203609Z' + errors: [ 'lumens: Invalid type. Expected: integer, given: string' ] lightMeasuredPayload: type: object properties: diff --git a/config/kafka.go b/config/kafka.go index 74d1328..8901c09 100644 --- a/config/kafka.go +++ b/config/kafka.go @@ -5,32 +5,28 @@ import ( "net" "strings" + watermillkafka "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka" "github.com/asyncapi/event-gateway/asyncapi" v2 "github.com/asyncapi/event-gateway/asyncapi/v2" "github.com/asyncapi/event-gateway/kafka" "github.com/asyncapi/event-gateway/message" "github.com/asyncapi/event-gateway/message/handler" "github.com/pkg/errors" + "github.com/sirupsen/logrus" ) // KafkaProxy holds the config for later configuring a Kafka proxy. type KafkaProxy struct { - BrokerFromServer string `split_words:"true" desc:"When configuring from an AsyncAPI doc, this allows the user to only configure one server instead of all"` - MessageValidation MessageValidation `split_words:"true" desc:""` + BrokerFromServer string `split_words:"true" desc:"When configuring from an AsyncAPI doc, this allows the user to only configure one server instead of all"` + MessageValidation MessageValidation `split_words:"true"` + TLS *kafka.TLSConfig ExtraFlags pipeSeparatedValues `split_words:"true" desc:"Advanced configuration. Configure any flag from https://github.com/grepplabs/kafka-proxy/blob/4f3b89fbaecb3eb82426f5dcff5f76188ea9a9dc/cmd/kafka-proxy/server.go#L85-L195. Multiple values can be configured by using pipe separation (|)"` } // MessageValidation holds the config about message validation. type MessageValidation struct { - Enabled bool `default:"true" desc:"Enable or disable validation of Kafka messages"` - Notifier message.ValidationErrorNotifier `envconfig:"-"` -} - -// NotifyValidationErrorOnChan sets a channel as ValidationError notifier. -func NotifyValidationErrorOnChan(errChan chan *message.ValidationError) Opt { - return func(app *App) { - app.KafkaProxy.MessageValidation.Notifier = message.ValidationErrorToChanNotifier(errChan) - } + Enabled bool `default:"true" desc:"Enable or disable validation of Kafka messages"` + PublishToKafkaTopic string `split_words:"true"` } // NewKafkaProxy creates a KafkaProxy with defaults. @@ -51,16 +47,6 @@ func (c *KafkaProxy) ProxyConfig(d []byte, debug bool) (*kafka.ProxyConfig, erro return nil, errors.Wrap(err, "error decoding AsyncAPI json doc to Document struct") } - opts := []kafka.Option{kafka.WithExtra(c.ExtraFlags.Values), kafka.WithDebug(debug)} - if c.MessageValidation.Enabled { - validator, err := v2.FromDocJSONSchemaMessageValidator(doc) - if err != nil { - return nil, errors.Wrap(err, "error creating message validator") - } - - opts = append(opts, kafka.WithMessageHandler(handler.ValidateMessage(validator, c.MessageValidation.Notifier, false))) - } - servers := doc.Servers() if c.BrokerFromServer != "" { // Pick up only the specified server @@ -76,14 +62,87 @@ func (c *KafkaProxy) ProxyConfig(d []byte, debug bool) (*kafka.ProxyConfig, erro servers = []asyncapi.Server{s} } - return kafkaProxyConfigFromServers(servers, opts...) + opts := []kafka.ProxyOption{kafka.WithExtra(c.ExtraFlags.Values), kafka.WithDebug(debug)} + if c.MessageValidation.Enabled { + messageValidationOpts, err := c.generateMessageValidatorOptions(doc, servers) + if err != nil { + return nil, errors.Wrap(err, "error configuring message validation") + } + + opts = append(opts, messageValidationOpts...) + } + + conf, err := kafkaProxyConfigFromServers(servers, opts...) + if err != nil { + return nil, err + } + conf.TLS = c.TLS + + return conf, nil +} + +func (c *KafkaProxy) generateMessageValidatorOptions(doc asyncapi.Document, servers []asyncapi.Server) ([]kafka.ProxyOption, error) { + validator, err := v2.FromDocJSONSchemaMessageValidator(doc) + if err != nil { + return nil, errors.Wrap(err, "error creating message validator") + } + + opts := []kafka.ProxyOption{kafka.WithMessageHandler(handler.ValidateMessage(validator, false))} + + if c.MessageValidation.PublishToKafkaTopic == "" { + logrus.Warn("No topic set for invalid messages. Invalid messages will be discarded") + return opts, nil + } + + // Configure Kafka Producer + saramaConf := watermillkafka.DefaultSaramaSyncPublisherConfig() + if c.TLS != nil && c.TLS.Enable { + tlsConfig, err := c.TLS.Config() + if err != nil { + return opts, fmt.Errorf("tls config is invalid. %w", err) + } + + saramaConf.Net.TLS.Enable = true + saramaConf.Net.TLS.Config = tlsConfig + } + + brokers := make([]string, len(servers)) + for i := 0; i < len(servers); i++ { + brokers[i] = servers[i].URL() + } + marshaler := watermillkafka.DefaultMarshaler{} + publisherConf := watermillkafka.PublisherConfig{ + Brokers: brokers, + Marshaler: marshaler, + OverwriteSaramaConfig: saramaConf, + } + logger := message.NewWatermillLogrusLogger(logrus.StandardLogger()) + publisher, err := watermillkafka.NewPublisher(publisherConf, logger) + if err != nil { + return opts, err + } + + subscriberConf := watermillkafka.SubscriberConfig{ + Brokers: brokers, + OverwriteSaramaConfig: saramaConf, + Unmarshaler: marshaler, + } + + subscriber, err := watermillkafka.NewSubscriber(subscriberConf, logger) + if err != nil { + return opts, err + } + + opts = append(opts, kafka.WithMessagePublisher(publisher, c.MessageValidation.PublishToKafkaTopic), kafka.WithMessageSubscriber(subscriber)) + + return opts, nil } func isValidKafkaProtocol(s asyncapi.Server) bool { return strings.HasPrefix(s.Protocol(), "kafka") } -func kafkaProxyConfigFromServers(servers []asyncapi.Server, opts ...kafka.Option) (*kafka.ProxyConfig, error) { +func kafkaProxyConfigFromServers(servers []asyncapi.Server, opts ...kafka.ProxyOption) (*kafka.ProxyConfig, error) { brokersMapping, dialAddressMapping, err := extractAddressMappingFromServers(servers...) if err != nil { return nil, err diff --git a/kafka/config.go b/kafka/config.go index b279fb2..deaec4b 100644 --- a/kafka/config.go +++ b/kafka/config.go @@ -1,7 +1,10 @@ package kafka import ( + "crypto/tls" + "crypto/x509" "fmt" + "io/ioutil" "net" "regexp" "strings" @@ -20,31 +23,80 @@ type ProxyConfig struct { DialAddressMapping []string ExtraConfig []string MessageHandler watermillmessage.HandlerFunc - MessageMiddlewares []watermillmessage.HandlerMiddleware + MessagePublisher watermillmessage.Publisher + PublishToTopic string + MessageSubscriber watermillmessage.Subscriber + TLS *TLSConfig Debug bool } -// Option represents a functional configuration for the Proxy. -type Option func(*ProxyConfig) error +// TLSConfig holds configuration for TLS. +type TLSConfig struct { + Enable bool + InsecureSkipVerify bool `split_words:"true"` + ClientCertFile string `split_words:"true"` + ClientKeyFile string `split_words:"true"` + CAChainCertFile string `split_words:"true"` +} -// WithMessageMiddlewares ... -func WithMessageMiddlewares(middlewares ...watermillmessage.HandlerMiddleware) Option { - return func(c *ProxyConfig) error { - c.MessageMiddlewares = append(c.MessageMiddlewares, middlewares...) - return nil +// Config returns a *tls.Config based on current config. +func (c *TLSConfig) Config() (*tls.Config, error) { + cfg := &tls.Config{InsecureSkipVerify: c.InsecureSkipVerify} //nolint:gosec + + if c.ClientCertFile != "" && c.ClientKeyFile != "" { + cert, err := tls.LoadX509KeyPair(c.ClientCertFile, c.ClientKeyFile) + if err != nil { + return nil, err + } + cfg.Certificates = []tls.Certificate{cert} + } + + if c.CAChainCertFile != "" { + caCertPEMBlock, err := ioutil.ReadFile(c.CAChainCertFile) + if err != nil { + return nil, err + } + rootCAs := x509.NewCertPool() + if ok := rootCAs.AppendCertsFromPEM(caCertPEMBlock); !ok { + return nil, errors.New("Failed to parse client root certificate") + } + + cfg.RootCAs = rootCAs } + + return cfg, nil } +// ProxyOption represents a functional configuration for the Proxy. +type ProxyOption func(*ProxyConfig) error + // WithMessageHandler ... -func WithMessageHandler(handler watermillmessage.HandlerFunc) Option { +func WithMessageHandler(handler watermillmessage.HandlerFunc) ProxyOption { return func(c *ProxyConfig) error { c.MessageHandler = handler return nil } } +// WithMessagePublisher configures a publisher where the messages will be published after being handled. +func WithMessagePublisher(publisher watermillmessage.Publisher, topic string) ProxyOption { + return func(c *ProxyConfig) error { + c.MessagePublisher = publisher + c.PublishToTopic = topic + return nil + } +} + +// WithMessageSubscriber configures a subscriber subscribed to the messages published by the configured c.MessagePublisher. +func WithMessageSubscriber(subscriber watermillmessage.Subscriber) ProxyOption { + return func(c *ProxyConfig) error { + c.MessageSubscriber = subscriber + return nil + } +} + // WithDebug enables/disables debug. -func WithDebug(enabled bool) Option { +func WithDebug(enabled bool) ProxyOption { return func(c *ProxyConfig) error { c.Debug = enabled return nil @@ -52,7 +104,7 @@ func WithDebug(enabled bool) Option { } // WithDialAddressMapping configures Dial Address Mapping. -func WithDialAddressMapping(mapping []string) Option { +func WithDialAddressMapping(mapping []string) ProxyOption { return func(c *ProxyConfig) error { c.DialAddressMapping = mapping return nil @@ -60,7 +112,7 @@ func WithDialAddressMapping(mapping []string) Option { } // WithExtra configures extra parameters. -func WithExtra(extra []string) Option { +func WithExtra(extra []string) ProxyOption { return func(c *ProxyConfig) error { c.ExtraConfig = extra return nil @@ -68,7 +120,7 @@ func WithExtra(extra []string) Option { } // NewProxyConfig creates a new ProxyConfig. -func NewProxyConfig(brokersMapping []string, opts ...Option) (*ProxyConfig, error) { +func NewProxyConfig(brokersMapping []string, opts ...ProxyOption) (*ProxyConfig, error) { c := &ProxyConfig{BrokersMapping: brokersMapping} for _, opt := range opts { if err := opt(c); err != nil { @@ -109,6 +161,9 @@ func (c *ProxyConfig) Validate() error { if c.MessageHandler == nil { logrus.Warn("There is no message handler configured") + return nil + } else if c.MessagePublisher != nil && c.PublishToTopic == "" || c.MessagePublisher == nil && c.PublishToTopic != "" { + return fmt.Errorf("MessagePublisher and PublishToTopic should be set together") } return nil diff --git a/kafka/config_test.go b/kafka/config_test.go index 9edbf31..8c64fcf 100644 --- a/kafka/config_test.go +++ b/kafka/config_test.go @@ -3,6 +3,7 @@ package kafka import ( "testing" + messagetest "github.com/asyncapi/event-gateway/message/test" "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) @@ -26,6 +27,24 @@ func TestProxyConfig_Validate(t *testing.T) { DialAddressMapping: []string{"broker.mybrokers.org:9092,192.168.1.10:9092"}, }, }, + { + name: "Invalid config. Message Handler is set, PublishToTopic is set but Publisher is not", + config: ProxyConfig{ + BrokersMapping: []string{"broker.mybrokers.org:9092,:9092"}, + MessageHandler: noopHandler, + PublishToTopic: "foo-topic", + }, + expectedErr: errors.New("MessagePublisher and PublishToTopic should be set together"), + }, + { + name: "Invalid config. Message Handler is set, Publisher is set but PublishToTopic is not", + config: ProxyConfig{ + BrokersMapping: []string{"broker.mybrokers.org:9092,:9092"}, + MessageHandler: noopHandler, + MessagePublisher: messagetest.NoopPublisher{}, + }, + expectedErr: errors.New("MessagePublisher and PublishToTopic should be set together"), + }, { name: "Invalid config. No broker mapping", expectedErr: errors.New("BrokersMapping is mandatory"), diff --git a/kafka/proxy.go b/kafka/proxy.go index 8fc2dd1..d2ea344 100644 --- a/kafka/proxy.go +++ b/kafka/proxy.go @@ -3,6 +3,7 @@ package kafka import ( "bytes" "context" + "fmt" "io" "strings" @@ -17,7 +18,6 @@ import ( kafkaprotocol "github.com/grepplabs/kafka-proxy/proxy/protocol" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "golang.org/x/sync/errgroup" ) // Kafka request API Keys. See https://kafka.apache.org/protocol#protocol_api_keys. @@ -33,7 +33,7 @@ const ( var defaultMarshaler = watermillkafka.DefaultMarshaler{} // NewProxy creates a new Kafka Proxy based on a given configuration. -func NewProxy(c *ProxyConfig) (proxy.Proxy, error) { +func NewProxy(c *ProxyConfig, r *watermillmessage.Router) (proxy.Proxy, error) { if c == nil { return nil, errors.New("config should be provided") } @@ -42,12 +42,8 @@ func NewProxy(c *ProxyConfig) (proxy.Proxy, error) { return nil, err } - r, err := watermillmessage.NewRouter(watermillmessage.RouterConfig{}, message.NewWatermillLogrusLogger(logrus.StandardLogger())) - if err != nil { - return nil, err - } // Yeah, not a good practice at all but I guess it's fine for now. - kafkaproxy.ActualDefaultRequestHandler.RequestKeyHandlers.Set(RequestAPIKeyProduce, NewProduceRequestHandler(r, c.MessageHandler, c.MessageMiddlewares...)) + kafkaproxy.ActualDefaultRequestHandler.RequestKeyHandlers.Set(RequestAPIKeyProduce, NewProduceRequestHandler(r, c.MessageHandler, c.MessagePublisher, c.PublishToTopic)) // Setting some defaults. _ = server.Server.Flags().Set("default-listener-ip", "0.0.0.0") // Binding to all local network interfaces. Needed for external calls. @@ -60,6 +56,14 @@ func NewProxy(c *ProxyConfig) (proxy.Proxy, error) { _ = server.Server.Flags().Set("log-level", "debug") } + if c.TLS != nil && c.TLS.Enable { + _ = server.Server.Flags().Set("tls-enable", "true") + _ = server.Server.Flags().Set("tls-insecure-skip-verify", fmt.Sprintf("%v", c.TLS.InsecureSkipVerify)) + _ = server.Server.Flags().Set("tls-client-cert-file", c.TLS.ClientCertFile) + _ = server.Server.Flags().Set("tls-client-key-file", c.TLS.ClientKeyFile) + _ = server.Server.Flags().Set("tls-ca-chain-cert-file", c.TLS.CAChainCertFile) + } + for _, v := range c.ExtraConfig { f := strings.Split(v, "=") _ = server.Server.Flags().Set(f[0], f[1]) @@ -74,21 +78,12 @@ func NewProxy(c *ProxyConfig) (proxy.Proxy, error) { } return func(ctx context.Context) error { - defer r.Close() - group, ctx := errgroup.WithContext(ctx) - group.Go(func() error { - return r.Run(ctx) // Note: Can not be called until fully configured. - }) - group.Go(func() error { - return server.Server.Execute() - }) - - return group.Wait() + return server.Server.Execute() }, nil } // NewProduceRequestHandler creates a new request key handler for the Produce Request. -func NewProduceRequestHandler(r *watermillmessage.Router, handler watermillmessage.HandlerFunc, middlewares ...watermillmessage.HandlerMiddleware) kafkaproxy.KeyHandler { +func NewProduceRequestHandler(r *watermillmessage.Router, handler watermillmessage.HandlerFunc, publisher watermillmessage.Publisher, publishToTopic string) kafkaproxy.KeyHandler { if handler == nil { return &produceRequestHandler{} } @@ -96,17 +91,19 @@ func NewProduceRequestHandler(r *watermillmessage.Router, handler watermillmessa chanConfig := gochannel.Config{ OutputChannelBuffer: 100, // TODO consider making this configurable } - goChannelPubSub := gochannel.NewGoChannel(chanConfig, message.NewWatermillLogrusLogger(logrus.StandardLogger())) - // This time we use a noPubisher handler, so converting the given handler. - h := func(msg *watermillmessage.Message) error { - _, err := handler(msg) - return err + goChannelPubSub := gochannel.NewGoChannel(chanConfig, message.NewWatermillLogrusLogger(logrus.StandardLogger())) + if publisher == nil { + // This time we use a noPublisher handler, so converting the given handler. + h := func(msg *watermillmessage.Message) error { + _, err := handler(msg) + return err + } + r.AddNoPublisherHandler("on-produce-request", messagesChannelName, goChannelPubSub, h) + } else { + r.AddHandler("on-produce-request", messagesChannelName, goChannelPubSub, publishToTopic, publisher, handler) } - r.AddNoPublisherHandler("on-produce-request", messagesChannelName, goChannelPubSub, h) - r.AddMiddleware(middlewares...) - return &produceRequestHandler{ publisher: goChannelPubSub, } diff --git a/kafka/proxy_test.go b/kafka/proxy_test.go index 3565217..2277de6 100644 --- a/kafka/proxy_test.go +++ b/kafka/proxy_test.go @@ -5,12 +5,11 @@ import ( "context" "encoding/binary" "hash/crc32" - "sync/atomic" "testing" "time" watermillmessage "github.com/ThreeDotsLabs/watermill/message" - "github.com/asyncapi/event-gateway/message" + messagetest "github.com/asyncapi/event-gateway/message/test" "github.com/asyncapi/event-gateway/proxy" kafkaproxy "github.com/grepplabs/kafka-proxy/proxy" kafkaprotocol "github.com/grepplabs/kafka-proxy/proxy/protocol" @@ -45,7 +44,15 @@ func TestNewKafka(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - p, err := NewProxy(test.c) + r := messagetest.NewRouter(t) + p, err := NewProxy(test.c, r) + + go func() { + require.NoError(t, r.Run(context.Background())) + }() + + <-r.Running() // Do not start test until the router is fully up and running. + if test.expectedErr != nil { assert.EqualError(t, err, test.expectedErr.Error()) assert.Nil(t, p) @@ -67,13 +74,22 @@ func TestProduceRequestHandler_Handle(t *testing.T) { extraCheck func(t *testing.T, handlerCalledTimes int) expectedLoggedErr string sleepBeforeCheck time.Duration - handler watermillmessage.HandlerFunc + handler func(t *testing.T) (watermillmessage.HandlerFunc, chan struct{}) + publisher func(t *testing.T, topic string) (watermillmessage.Publisher, chan struct{}) + publishToTopic string }{ { - name: "Handler success", + name: "Handler success. No publisher is set.", + request: generateProduceRequestV8("valid message"), + shouldReply: true, + }, + { + name: "Handler success. Publisher is set.", request: generateProduceRequestV8("valid message"), shouldReply: true, - handler: noopHandler, + publisher: func(t *testing.T, topic string) (watermillmessage.Publisher, chan struct{}) { + return messagetest.ReliablePublisher(t, topic, 1, time.Second*2) // at least 1 message during max 2 seconds + }, }, { name: "Handler error means Nack. Message is resent infinitely.", @@ -83,8 +99,11 @@ func TestProduceRequestHandler_Handle(t *testing.T) { extraCheck: func(t *testing.T, handlerCalls int) { assert.Greater(t, handlerCalls, 1) }, - handler: func(msg *watermillmessage.Message) ([]*watermillmessage.Message, error) { - return nil, errors.New("message is invalid, meaning Nack will be sent and message will be resent infinitely") // This will send a Nack, meaning message will be resent. + handler: func(t *testing.T) (watermillmessage.HandlerFunc, chan struct{}) { + h := func(msg *watermillmessage.Message) ([]*watermillmessage.Message, error) { + return nil, errors.New("message is invalid, meaning Nack will be sent and message will be resent infinitely") // This will send a Nack, meaning message will be resent. + } + return messagetest.AssertCalledHandlerFunc(t, h, 5, 100*time.Millisecond) // at least 5 calls during max 100 millis }, sleepBeforeCheck: time.Millisecond, // letting the handlers be called several times due to Nack produced by returning an error. }, @@ -94,38 +113,52 @@ func TestProduceRequestHandler_Handle(t *testing.T) { apiKey: int16(42), shouldReply: true, shouldSkipRequest: true, + handler: func(_ *testing.T) (watermillmessage.HandlerFunc, chan struct{}) { + return func(msg *watermillmessage.Message) ([]*watermillmessage.Message, error) { + return nil, errors.New("this handler should never be called") + }, nil + }, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { log := logrustest.NewGlobal() - r, err := watermillmessage.NewRouter(watermillmessage.RouterConfig{}, message.NewWatermillLogrusLogger(logrus.StandardLogger())) - require.NoError(t, err) + handler := noopHandler + if test.handler != nil { + h, done := test.handler(t) - var handlerCalls uint32 - // Wrapping handler so we can add a spy. - handler := func(msg *watermillmessage.Message) ([]*watermillmessage.Message, error) { - atomic.AddUint32(&handlerCalls, 1) - if test.handler != nil { - return test.handler(msg) - } + handler = h + defer func() { + if done != nil { + <-done // wait until handler has been called n times + } + }() + } - return noopHandler(msg) + var pub watermillmessage.Publisher + if test.publisher != nil { + p, done := test.publisher(t, t.Name()) + t.Cleanup(func() { + assert.NoError(t, pub.Close()) + }) + + pub = p + defer func() { + if done != nil { + <-done // wait until publisher has published all messages + } + }() } - h := NewProduceRequestHandler(r, handler) + + r := messagetest.NewRouterWithLogs(t, logrus.StandardLogger()) + h := NewProduceRequestHandler(r, handler, pub, t.Name()) go func() { - // Running the router require.NoError(t, r.Run(context.Background())) }() - t.Cleanup(func() { - // Closing the router - require.NoError(t, r.Close()) - }) - // Do not start test until the router is fully up and running. - <-r.Running() + <-r.Running() // Do not start test until the router is fully up and running. kv := &kafkaprotocol.RequestKeyVersion{ ApiVersion: 8, // All test data was grabbed from a Produce Request version 8. @@ -147,12 +180,9 @@ func TestProduceRequestHandler_Handle(t *testing.T) { time.Sleep(test.sleepBeforeCheck) } - if test.extraCheck != nil { - test.extraCheck(t, int(atomic.LoadUint32(&handlerCalls))) - } - if test.expectedLoggedErr != "" { entry := log.LastEntry() + require.NotNil(t, entry) require.Contains(t, entry.Data, logrus.ErrorKey) assert.EqualError(t, entry.Data[logrus.ErrorKey].(error), test.expectedLoggedErr) } else { diff --git a/main.go b/main.go index 00be7ea..fa1554d 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,7 @@ import ( "syscall" "time" + watermillmessage "github.com/ThreeDotsLabs/watermill/message" "github.com/asyncapi/event-gateway/config" "github.com/asyncapi/event-gateway/kafka" "github.com/asyncapi/event-gateway/message" @@ -17,14 +18,13 @@ import ( "github.com/kelseyhightower/envconfig" "github.com/olahol/melody" "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" ) const configPrefix = "eventgateway" func main() { - validationErrChan := make(chan *message.ValidationError) - c := config.NewApp(config.NotifyValidationErrorOnChan(validationErrChan)) - + c := config.NewApp() if err := envconfig.Process(configPrefix, c); err != nil { _ = envconfig.Usage(configPrefix, c) logrus.WithError(err).Fatal() @@ -40,61 +40,92 @@ func main() { logrus.WithError(err).Fatal() } - kafkaProxy, err := kafka.NewProxy(kafkaProxyConfig) + watermillLogger := message.NewWatermillLogrusLogger(logrus.StandardLogger()) + messageRouter, err := watermillmessage.NewRouter(watermillmessage.RouterConfig{}, watermillLogger) if err != nil { - _ = envconfig.Usage(configPrefix, c) logrus.WithError(err).Fatal() } - - m := melody.New() + defer messageRouter.Close() ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + m := melody.New() handleInterruptions(cancel, func() error { return m.CloseWithMsg(melody.FormatCloseMessage(1000, "The server says goodbye :)")) }) + if kafkaProxyConfig.MessageSubscriber != nil { + defer kafkaProxyConfig.MessageSubscriber.Close() + defer kafkaProxyConfig.MessagePublisher.Close() + messageRouter.AddNoPublisherHandler("consume-validation-errors-from-kafka", c.KafkaProxy.MessageValidation.PublishToKafkaTopic, kafkaProxyConfig.MessageSubscriber, validationErrorsHandler(m)) + } + + kafkaProxy, err := kafka.NewProxy(kafkaProxyConfig, messageRouter) + if err != nil { + _ = envconfig.Usage(configPrefix, c) + logrus.WithError(err).Fatal() + } + + runWebsocketServer(c.WSServerPort, "/ws", m) + + group, ctx := errgroup.WithContext(ctx) + group.Go(func() error { + return messageRouter.Run(ctx) // Note: Can not be called until fully configured. + }) + group.Go(func() error { + return kafkaProxy(ctx) + }) + + if err := group.Wait(); err != nil { + logrus.WithError(err).Fatal() + } +} + +func runWebsocketServer(port int, path string, m *melody.Melody) { r := chi.NewRouter() - r.Get("/ws", func(w http.ResponseWriter, r *http.Request) { + r.Get(path, func(w http.ResponseWriter, r *http.Request) { _ = m.HandleRequest(w, r) }) go func() { - address := fmt.Sprintf(":%v", c.WSServerPort) + address := fmt.Sprintf(":%v", port) logrus.Infof("Websocket server listening on %s", address) if err := http.ListenAndServe(address, r); err != nil { logrus.WithError(err).Fatal("error running websocket server") } }() - - go handleValidationErrors(ctx, validationErrChan, m) - if err := kafkaProxy(context.Background()); err != nil { - logrus.WithError(err).Fatal() - } } -func handleValidationErrors(ctx context.Context, validationErrChan chan *message.ValidationError, m *melody.Melody) { - for { - select { - case validationErr, ok := <-validationErrChan: - if !ok { - return - } +func validationErrorsHandler(m *melody.Melody) watermillmessage.NoPublishHandlerFunc { + return func(msg *watermillmessage.Message) error { + validationError, err := message.ValidationErrorFromMessage(msg) + if err != nil { + logrus.WithError(err).WithField("uuid", msg.UUID).Error("Couldn't determine if the message is rather invalid or not") + return nil // no retry + } - content, err := json.Marshal(validationErr) - if err != nil { - logrus.WithError(err).Error("error encoding validationError") - content = []byte(`For some reason, this validation error can't be seen. Please drop us an issue on github.'`) - } + if validationError == nil { + return nil + } - logrus.WithError(validationErr).Debug("Message is invalid") + content, err := json.Marshal(msg) + if err != nil { + logrus.WithError(err).Error("error marshaling message") + content = []byte(fmt.Sprintf( + "The message %s is invalid: %s. However, there was an error during encoding and couldn't be shown completely. Please drop us an issue on github.", + msg.UUID, + validationError.Error(), + )) + } - if err := m.Broadcast(content); err != nil { - logrus.WithError(err).Error("error broadcasting message to all ws sessions") - } - case <-ctx.Done(): - return + logrus.WithError(validationError).Debug("Message is invalid") + + if err := m.Broadcast(content); err != nil { + logrus.WithError(err).Error("error broadcasting message to all ws sessions") } + + return nil } } diff --git a/message/handler/validate.go b/message/handler/validate.go index 4e61f45..3226d23 100644 --- a/message/handler/validate.go +++ b/message/handler/validate.go @@ -1,6 +1,8 @@ package handler import ( + "encoding/json" + watermillmessage "github.com/ThreeDotsLabs/watermill/message" "github.com/asyncapi/event-gateway/message" "github.com/pkg/errors" @@ -10,9 +12,10 @@ import ( // ErrMessageIsInvalid is the error used when a message did not pass validation and failWhenInvalid option was set to true. var ErrMessageIsInvalid = errors.New("Message is invalid and failWhenInvalid was set to true") -// ValidateMessage validates a message. Optionally notifies if a notifier is set. -// By default, next handler will always be called, including whenever the message is invalid. If you want to make it fail then, set failWhenInvalid to true. -func ValidateMessage(validator message.Validator, notifier message.ValidationErrorNotifier, failWhenInvalid bool) watermillmessage.HandlerFunc { +// ValidateMessage validates a message. If invalid, It injects the validation error into the message Metadata. +// By default, next handler will always be called, including whenever the message is invalid. +// In case you want to make it fail, set failWhenInvalid to true. +func ValidateMessage(validator message.Validator, failWhenInvalid bool) watermillmessage.HandlerFunc { return func(msg *watermillmessage.Message) ([]*watermillmessage.Message, error) { validationErr, err := validator(msg) if err != nil { @@ -20,19 +23,19 @@ func ValidateMessage(validator message.Validator, notifier message.ValidationErr } if validationErr != nil { - if notifier != nil { - if err := notifier(validationErr); err != nil { - logrus.WithError(err).Error("error notifying message validation error") - } - } else { - logrus.WithError(validationErr).Error("Message is invalid") + if failWhenInvalid { + err = ErrMessageIsInvalid } - if failWhenInvalid { - return nil, ErrMessageIsInvalid + rawErr, err := json.Marshal(validationErr) + if err != nil { + logrus.WithError(err).Error("Error marshaling message.ValidationError") } + + // Injecting the validation error into the message Metadata. + msg.Metadata.Set(message.MetadataValidationError, string(rawErr)) } - return []*watermillmessage.Message{msg}, nil + return []*watermillmessage.Message{msg}, err } } diff --git a/message/handler/validate_test.go b/message/handler/validate_test.go index 5bcf88b..c470450 100644 --- a/message/handler/validate_test.go +++ b/message/handler/validate_test.go @@ -7,15 +7,16 @@ import ( watermillmessage "github.com/ThreeDotsLabs/watermill/message" "github.com/asyncapi/event-gateway/message" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestValidateMessage(t *testing.T) { tests := []struct { - name string - validator message.Validator - notifier message.ValidationErrorNotifier - failWhenInvalid bool - expectedErr error + name string + validator message.Validator + failWhenInvalid bool + expectedValidationErr string + expectedErr error }{ { name: "Message is valid", @@ -24,44 +25,44 @@ func TestValidateMessage(t *testing.T) { }, }, { - name: "Message is invalid. No notifier is set. failWhenInvalid = false", - validator: invalidMessageValidator, + name: "Message is invalid. failWhenInvalid = false", + validator: invalidMessageValidator, + expectedValidationErr: "testing error", }, { - name: "Message is invalid. No notifier is set. failWhenInvalid = true", - failWhenInvalid: true, - expectedErr: ErrMessageIsInvalid, - validator: invalidMessageValidator, - }, - { - name: "Message is invalid. Notifier is set.", - notifier: func(validationError *message.ValidationError) error { - assert.NotNil(t, validationError) - return nil - }, - validator: invalidMessageValidator, + name: "Message is invalid.failWhenInvalid = true", + validator: invalidMessageValidator, + failWhenInvalid: true, + expectedValidationErr: "testing error", + expectedErr: ErrMessageIsInvalid, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { msg := message.New([]byte{}, test.name) - returnedMsgs, err := ValidateMessage(test.validator, test.notifier, test.failWhenInvalid)(msg) + returnedMsgs, err := ValidateMessage(test.validator, test.failWhenInvalid)(msg) if test.expectedErr != nil { assert.EqualError(t, err, test.expectedErr.Error()) - assert.Empty(t, returnedMsgs) } else { assert.NoError(t, err) - assert.Len(t, returnedMsgs, 1) - assert.Same(t, msg, returnedMsgs[0]) + } + + assert.Len(t, returnedMsgs, 1) + assert.Same(t, msg, returnedMsgs[0]) + + validationErr, err := message.ValidationErrorFromMessage(msg) + require.NoError(t, err) + + if test.expectedValidationErr != "" { + assert.Equal(t, test.expectedValidationErr, validationErr.Error()) } }) } } -func invalidMessageValidator(m *watermillmessage.Message) (*message.ValidationError, error) { +func invalidMessageValidator(_ *watermillmessage.Message) (*message.ValidationError, error) { return &message.ValidationError{ Timestamp: time.Now(), - Msg: m, Errors: []string{"testing error"}, }, nil } diff --git a/message/message.go b/message/message.go index 3e984bd..41c106a 100644 --- a/message/message.go +++ b/message/message.go @@ -5,14 +5,6 @@ import ( watermillmessage "github.com/ThreeDotsLabs/watermill/message" ) -// The following constants are the keys on a watermill.Message Metadata where we store valuable and needed domain metadata. -// All contain the prefix `_asyncapi_eg_` so they can be unique-ish and human-readable. -// As a note: The term `eg` is a short version of Event-Gateway. -const ( - // MetadataChannel is the key used for storing the Channel in the message Metadata. - MetadataChannel = "_asyncapi_eg_channel" -) - // New creates a new watermillmessage.Message. It injects the Channel as Metadata. func New(payload []byte, channel string) *watermillmessage.Message { msg := watermillmessage.NewMessage(watermill.NewUUID(), payload) diff --git a/message/metadata.go b/message/metadata.go new file mode 100644 index 0000000..a195c1a --- /dev/null +++ b/message/metadata.go @@ -0,0 +1,28 @@ +package message + +import ( + "encoding/json" + + watermillmessage "github.com/ThreeDotsLabs/watermill/message" +) + +// The following constants are the keys on a watermill.Message Metadata where we store valuable and needed domain metadata. +// All contain the prefix `_asyncapi_eg_` so they can be unique-ish and human-readable. +// As a note: The term `eg` is a short version of Event-Gateway. +const ( + // MetadataChannel is the key used for storing the Channel in the message Metadata. + MetadataChannel = "_asyncapi_eg_channel" + + // MetadataValidationError is the key used for storing the Validation Error if applies. + MetadataValidationError = "_asyncapi_eg_validation_error" +) + +// UnmarshalMetadata extracts a value from the Message Metadata and unmarshals it to the given object. +func UnmarshalMetadata(msg *watermillmessage.Message, key string, unmarshalTo interface{}) error { + raw := msg.Metadata.Get(key) + if raw == "" { + return nil + } + + return json.Unmarshal([]byte(raw), unmarshalTo) +} diff --git a/message/test/utils.go b/message/test/utils.go new file mode 100644 index 0000000..04fdc60 --- /dev/null +++ b/message/test/utils.go @@ -0,0 +1,118 @@ +package test + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/ThreeDotsLabs/watermill" + watermillmessage "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" + "github.com/asyncapi/event-gateway/message" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" +) + +// ----------------------------------------- // +// Functions and helpers useful for testing // +// ----------------------------------------- // + +// NoopPublisher is a publisher that publishes nothing. +// Implements watermillmessage.Publisher interface. +type NoopPublisher struct{} + +func (n NoopPublisher) Publish(_ string, _ ...*watermillmessage.Message) error { + return nil +} + +func (n NoopPublisher) Close() error { + return nil +} + +// ReliablePublisher creates a publisher with a subscriber that will make the test fail if the number of published messages does not reach `messagesToAssert` before a given timeout. +// Should be only used on tests. +func ReliablePublisher(t *testing.T, topic string, messagesToAssert int, timeout time.Duration) (publisher *gochannel.GoChannel, done chan struct{}) { + p := gochannel.NewGoChannel(gochannel.Config{Persistent: true}, watermill.NopLogger{}) + msgs, err := p.Subscribe(context.Background(), topic) + if err != nil { + t.Fatal(err) + } + + done = make(chan struct{}) + var messagesReceived int + go func() { + defer func() { + done <- struct{}{} + }() + + timer := time.NewTimer(timeout) + for { + select { + case <-msgs: + messagesReceived++ + if messagesReceived == messagesToAssert { + return + } + case <-timer.C: + t.Errorf("Publisher was expected to publish at least %v, but only %v were published within %s", messagesToAssert, messagesReceived, timeout) + return + } + } + }() + + return p, done +} + +// AssertCalledHandlerFunc returns a new HandlerFunc based on a given one that will make the test fail if the given handler has not been called at least `times` before a given timeout. +func AssertCalledHandlerFunc(t *testing.T, h watermillmessage.HandlerFunc, times uint32, timeout time.Duration) (handler watermillmessage.HandlerFunc, done chan struct{}) { + var calledTimes uint32 + handler = func(msg *watermillmessage.Message) ([]*watermillmessage.Message, error) { + atomic.AddUint32(&calledTimes, +1) + return h(msg) + } + + done = make(chan struct{}) + go func() { + defer func() { + done <- struct{}{} + }() + timer := time.NewTimer(timeout) + for { + select { + case <-timer.C: + t.Errorf("Handler was expected to be called at least %v, but only %v times were called within %s", times, atomic.LoadUint32(&calledTimes), timeout) + return + default: + if atomic.LoadUint32(&calledTimes) == times { + return + } + } + } + }() + + return handler, done +} + +// NewRouterWithLogs creates a watermillmessage.Router with some needed test cleanup actions. It also configures a logger. +func NewRouterWithLogs(t *testing.T, logger *logrus.Logger) *watermillmessage.Router { + return newRouter(t, message.NewWatermillLogrusLogger(logger)) +} + +// NewRouter creates a watermillmessage.Router with some needed test cleanup actions. +func NewRouter(t *testing.T) *watermillmessage.Router { + return newRouter(t, watermill.NopLogger{}) +} + +func newRouter(t *testing.T, logger watermill.LoggerAdapter) *watermillmessage.Router { + r, err := watermillmessage.NewRouter(watermillmessage.RouterConfig{}, logger) + if err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + assert.NoError(t, r.Close()) + }) + + return r +} diff --git a/message/validation.go b/message/validation.go index 3078275..b7cfbb5 100644 --- a/message/validation.go +++ b/message/validation.go @@ -1,6 +1,7 @@ package message import ( + "encoding/json" "fmt" "strings" "time" @@ -16,35 +17,39 @@ type Validator func(*watermillmessage.Message) (*ValidationError, error) // ValidationError represents a message validation error. type ValidationError struct { - Timestamp time.Time `json:"ts"` - Msg *watermillmessage.Message `json:"msg"` - Errors []string `json:"errors"` + Timestamp time.Time `json:"ts"` + Errors []string `json:"errors"` } func (v ValidationError) Error() string { - return fmt.Sprintf("message %q is invalid. Validation errors: %s", v.Msg.UUID, strings.Join(v.Errors, " | ")) + return strings.Join(v.Errors, " | ") } // NewValidationError creates a new ValidationError. -func NewValidationError(msg *watermillmessage.Message, ts time.Time, errors ...string) *ValidationError { - return &ValidationError{Msg: msg, Timestamp: ts, Errors: errors} +func NewValidationError(ts time.Time, errors ...string) *ValidationError { + return &ValidationError{Timestamp: ts, Errors: errors} } -// ValidationErrorNotifier notifies whenever a ValidationError happens. -type ValidationErrorNotifier func(validationError *ValidationError) error - -// ValidationErrorToChanNotifier notifies to a given chan when a ValidationError happens. -func ValidationErrorToChanNotifier(errChan chan *ValidationError) ValidationErrorNotifier { - return func(validationError *ValidationError) error { - // TODO Blocking or non blocking? Shall we just fire and forget via goroutine instead? - errChan <- validationError +// ValidationErrorFromMessage extracts a ValidationError from the message Metadata if exists. +func ValidationErrorFromMessage(msg *watermillmessage.Message) (*ValidationError, error) { + validationErr := new(ValidationError) + return validationErr, UnmarshalMetadata(msg, MetadataValidationError, validationErr) +} - return nil +// ValidationErrorToMessage sets a ValidationError to the given message Metadata. +func ValidationErrorToMessage(msg *watermillmessage.Message, validationErr *ValidationError) error { + raw, err := json.Marshal(validationErr) + if err != nil { + return err } + + msg.Metadata.Set(MetadataValidationError, string(raw)) + + return nil } // JSONSchemaMessageValidator validates a message payload based on a map of Json Schema, where the key can be any identifier (depends on who implements it). -// For example, the identifier can be it's channel name, message ID, etc. +// For example, the identifier can be its channel name, message ID, etc. func JSONSchemaMessageValidator(messageSchemas map[string]gojsonschema.JSONLoader, idProvider func(msg *watermillmessage.Message) string) (Validator, error) { return func(msg *watermillmessage.Message) (*ValidationError, error) { msgID := idProvider(msg) @@ -64,7 +69,7 @@ func JSONSchemaMessageValidator(messageSchemas map[string]gojsonschema.JSONLoade errs[i] = result.Errors()[i].String() } - return NewValidationError(msg, time.Now(), errs...), nil + return NewValidationError(time.Now(), errs...), nil } return nil, nil diff --git a/message/validation_test.go b/message/validation_test.go index 63f3a42..86f7a88 100644 --- a/message/validation_test.go +++ b/message/validation_test.go @@ -68,6 +68,17 @@ func TestJsonSchemaMessageValidator(t *testing.T) { } } +func TestValidationErrorFromMessage(t *testing.T) { + msg := New([]byte{}, "channel") + + validationErr := &ValidationError{Errors: []string{"random error!"}} + assert.NoError(t, ValidationErrorToMessage(msg, validationErr)) + + fetchedValidationErr, err := ValidationErrorFromMessage(msg) + assert.NoError(t, err) + assert.ElementsMatch(t, validationErr.Errors, fetchedValidationErr.Errors) +} + func generateTestMessage() *watermillmessage.Message { msg := watermillmessage.NewMessage(watermill.NewUUID(), []byte(`Hello World!`)) msg.Metadata.Set(MetadataChannel, "the-channel") From 894f0dc4a2b2b80fcc087068cc74a53472339050 Mon Sep 17 00:00:00 2001 From: Sergio Moya <1083296+smoya@users.noreply.github.com> Date: Wed, 13 Oct 2021 11:38:08 +0200 Subject: [PATCH 2/4] Update kafka/config.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Maciej UrbaƄczyk --- kafka/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/config.go b/kafka/config.go index deaec4b..9d3bfb8 100644 --- a/kafka/config.go +++ b/kafka/config.go @@ -162,7 +162,7 @@ func (c *ProxyConfig) Validate() error { if c.MessageHandler == nil { logrus.Warn("There is no message handler configured") return nil - } else if c.MessagePublisher != nil && c.PublishToTopic == "" || c.MessagePublisher == nil && c.PublishToTopic != "" { + } else if (c.MessagePublisher != nil && c.PublishToTopic == "") || (c.MessagePublisher == nil && c.PublishToTopic != "") { return fmt.Errorf("MessagePublisher and PublishToTopic should be set together") } From 12ca6e3d54e09df346d83884aebf3e2cdb22117d Mon Sep 17 00:00:00 2001 From: Sergio Moya <1083296+smoya@users.noreply.github.com> Date: Wed, 13 Oct 2021 12:00:11 +0200 Subject: [PATCH 3/4] rename schemas --- asyncapi.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/asyncapi.yaml b/asyncapi.yaml index ed90dfe..1923824 100644 --- a/asyncapi.yaml +++ b/asyncapi.yaml @@ -57,7 +57,7 @@ components: summary: Message with a Validation Error. contentType: application/json payload: - $ref: "#/components/schemas/message" + $ref: "#/components/schemas/sentMessage" headers: type: object properties: @@ -65,9 +65,9 @@ components: type: string description: AsyncAPI Channel where the message was published to. _asyncapi_eg_validation_error: - $ref: '#/components/schemas/ValidationError' + $ref: '#/components/schemas/validationError' schemas: - message: + sentMessage: type: object properties: UUID: @@ -79,7 +79,7 @@ components: examples: - UUID: 'YXN5bmNhcGktd2FzLWhlcmU=' Payload: 'eyJsdW1lbnMiOiAid2hhdGV2ZXIifQ==' - ValidationError: + validationError: type: object properties: ts: From 13b60b0d73717b1c258eafca8e9301722d3f5b3c Mon Sep 17 00:00:00 2001 From: Sergio Moya <1083296+smoya@users.noreply.github.com> Date: Wed, 13 Oct 2021 12:32:56 +0200 Subject: [PATCH 4/4] fix tests --- message/test/utils.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/message/test/utils.go b/message/test/utils.go index 04fdc60..0c584da 100644 --- a/message/test/utils.go +++ b/message/test/utils.go @@ -51,7 +51,7 @@ func ReliablePublisher(t *testing.T, topic string, messagesToAssert int, timeout select { case <-msgs: messagesReceived++ - if messagesReceived == messagesToAssert { + if messagesReceived >= messagesToAssert { return } case <-timer.C: @@ -84,7 +84,7 @@ func AssertCalledHandlerFunc(t *testing.T, h watermillmessage.HandlerFunc, times t.Errorf("Handler was expected to be called at least %v, but only %v times were called within %s", times, atomic.LoadUint32(&calledTimes), timeout) return default: - if atomic.LoadUint32(&calledTimes) == times { + if atomic.LoadUint32(&calledTimes) >= times { return } }