Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

feat: configure extra message route to publish validationErrors to Kafka topic #61

Merged
merged 4 commits into from
Oct 13, 2021

Conversation

smoya
Copy link
Collaborator

@smoya smoya commented Oct 11, 2021

Description

Fixes #37

This PR adds support for configuring an extra publisher for incoming Kafka messages so they can be published again to another topic after validation (Same kafka cluster, different topic).

Now, validated messages that got a validation error can be re-enqueued to another kafka topic so they can be consumed by someone else and run forensics on their own. In this case, we will be exposing those by default to the websocket port by consuming directly from the second configured topic.

The asyncapi.yaml has also changed since the validation errors are now injected to the Metadata (headers) of the Message model, so they can be reused, rerouted, etc.

Related issue(s)
#37

type ValidationErrorNotifier func(validationError *ValidationError) error

// ValidationErrorToChanNotifier notifies to a given chan when a ValidationError happens.
func ValidationErrorToChanNotifier(errChan chan *ValidationError) ValidationErrorNotifier {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this has been removed in favor of using publisher through watermill.io routes.

@@ -60,6 +56,14 @@ func NewProxy(c *ProxyConfig) (proxy.Proxy, error) {
_ = server.Server.Flags().Set("log-level", "debug")
}

if c.TLS != nil && c.TLS.Enable {
Copy link
Collaborator Author

@smoya smoya Oct 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was previously configured through the c ExtraConfig but now they have been moved into the config directly as it is important to be explicit. In that way, the TLS config can be reused into other parts.


opts := []kafka.ProxyOption{kafka.WithMessageHandler(handler.ValidateMessage(validator, false))}

if c.MessageValidation.PublishToKafkaTopic == "" {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the topic is set (from env var atm), then all the re-routing to publish the messages back into Kafka (but another topic) is configured.

Copy link
Collaborator

@magicmatatjahu magicmatatjahu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! :) Only one question, one suggestion :)

asyncapi.yaml Outdated Show resolved Hide resolved
kafka/config.go Outdated Show resolved Hide resolved
Co-authored-by: Maciej Urbańczyk <urbanczyk.maciej.95@gmail.com>
@smoya smoya requested a review from magicmatatjahu October 13, 2021 09:38
@smoya smoya merged commit dceaa39 into asyncapi-archived-repos:master Oct 13, 2021
@smoya smoya deleted the feat/rerouting branch October 13, 2021 10:55
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Websocket server should consume from a queue so all instances broadcast the same events
2 participants