Skip to content

Commit

Permalink
Refactorings after review nr 2
Browse files Browse the repository at this point in the history
  • Loading branch information
matskramer committed Oct 4, 2024
1 parent 0aa0056 commit 5d43b6a
Show file tree
Hide file tree
Showing 11 changed files with 282 additions and 271 deletions.
4 changes: 2 additions & 2 deletions cmd/apigw/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func main() {
var eventPublisher apiv1.EventPublisher
if cfg.Common.Kafka.Enabled {
var err error
eventPublisher, err = outbound.NewEventPublisher(ctx, cfg, tracer, log)
eventPublisher, err = outbound.New(ctx, cfg, tracer, log)
if err != nil {
panic(err)
}
Expand All @@ -83,7 +83,7 @@ func main() {
panic(err)
}

eventConsumer, err := inbound.NewEventConsumer(cfg, log, apiv1Client, tracer)
eventConsumer, err := inbound.New(ctx, cfg, log, apiv1Client, tracer)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/mockas/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func main() {
panic(err)
}

eventConsumer, err := inbound.NewEventConsumer(cfg, log, apiv1Client, tracer)
eventConsumer, err := inbound.New(ctx, cfg, log, apiv1Client, tracer)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/ui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func main() {
var eventPublisher apiv1.EventPublisher
if cfg.Common.Kafka.Enabled {
var err error
eventPublisher, err = outbound.NewEventPublisher(ctx, cfg, tracer, log)
eventPublisher, err = outbound.New(ctx, cfg, tracer, log)
if err != nil {
panic(err)
}
Expand Down
8 changes: 4 additions & 4 deletions internal/apigw/inbound/kafka_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
"vc/pkg/trace"
)

func NewEventConsumer(cfg *model.Cfg, log *logger.Log, apiv1Client *apiv1.Client, tracer *trace.Tracer) (messagebroker.EventConsumer, error) {
func New(ctx context.Context, cfg *model.Cfg, log *logger.Log, apiv1Client *apiv1.Client, tracer *trace.Tracer) (messagebroker.EventConsumer, error) {
if !cfg.Common.Kafka.Enabled {
log.Info("Kafka disabled - no consumer created")
return nil, nil
}

kafkaMessageConsumerClient, err := kafka.NewMessageConsumerClient(kafka.CommonConsumerConfig(cfg), cfg.Common.Kafka.Brokers, log.New("kafka_consumer_client"))
client, err := kafka.NewConsumerClient(ctx, cfg, cfg.Common.Kafka.Brokers, log.New("kafka_consumer_client"))
if err != nil {
return nil, err
}
Expand All @@ -36,10 +36,10 @@ func NewEventConsumer(cfg *model.Cfg, log *logger.Log, apiv1Client *apiv1.Client
return &kafka.ConsumerGroupHandler{Handlers: handlersMap, Log: log.New("kafka_consumer_group_handler")}
}

if err := kafkaMessageConsumerClient.Start(handlerFactory, handlerConfigs); err != nil {
if err := client.Start(ctx, handlerFactory, handlerConfigs); err != nil {
return nil, err
}
return kafkaMessageConsumerClient, nil
return client, nil
}

func newUploadMessageHandler(log *logger.Log, apiv1 *apiv1.Client, tracer *trace.Tracer) *UploadMessageHandler {
Expand Down
22 changes: 11 additions & 11 deletions internal/apigw/outbound/kafka_message_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,23 @@ import (
"vc/pkg/trace"
)

type KafkaMessageProducer struct {
kafkaMessageProducerClient *kafka.MessageSyncProducerClient
type kafkaMessageProducer struct {
client *kafka.SyncProducerClient
}

func NewEventPublisher(ctx context.Context, cfg *model.Cfg, tracer *trace.Tracer, log *logger.Log) (apiv1.EventPublisher, error) {
kafkaMessageProducerClient, err := kafka.NewMessageSyncProducerClient(kafka.CommonProducerConfig(cfg), ctx, cfg, tracer, log.New("kafka_message_producer_client"))
func New(ctx context.Context, cfg *model.Cfg, tracer *trace.Tracer, log *logger.Log) (apiv1.EventPublisher, error) {
client, err := kafka.NewSyncProducerClient(ctx, kafka.CommonProducerConfig(cfg), cfg, tracer, log.New("kafka_message_producer_client"))
if err != nil {
return nil, err
}

return &KafkaMessageProducer{
kafkaMessageProducerClient: kafkaMessageProducerClient,
return &kafkaMessageProducer{
client: client,
}, nil

}

func (s *KafkaMessageProducer) Upload(uploadRequest *apiv1.UploadRequest) error {
func (s *kafkaMessageProducer) Upload(uploadRequest *apiv1.UploadRequest) error {
if uploadRequest == nil {
return errors.New("param uploadRequest is nil")
}
Expand All @@ -46,12 +46,12 @@ func (s *KafkaMessageProducer) Upload(uploadRequest *apiv1.UploadRequest) error
{Key: []byte(kafka.TypeOfStructInMessageValue), Value: typeHeader},
}

return s.kafkaMessageProducerClient.PublishMessage(kafka.TopicUpload, uploadRequest.Meta.DocumentID, jsonMarshaled, headers)
return s.client.PublishMessage(kafka.TopicUpload, uploadRequest.Meta.DocumentID, jsonMarshaled, headers)
}

func (s *KafkaMessageProducer) Close(ctx context.Context) error {
if s.kafkaMessageProducerClient != nil {
return s.kafkaMessageProducerClient.Close(ctx)
func (s *kafkaMessageProducer) Close(ctx context.Context) error {
if s.client != nil {
return s.client.Close(ctx)
}
return nil
}
1 change: 0 additions & 1 deletion internal/mockas/apiv1/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func (c *Client) MockNext(ctx context.Context, inData *MockNextRequest) (*MockNe
if err != nil {
return nil, err
}
c.log.Debug("mocknext", "mockUpload", mockUpload)

resp, err := c.uploader(ctx, mockUpload)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions internal/mockas/inbound/kafka_message_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
"vc/pkg/trace"
)

func NewEventConsumer(cfg *model.Cfg, log *logger.Log, apiv1Client *apiv1.Client, tracer *trace.Tracer) (messagebroker.EventConsumer, error) {
func New(ctx context.Context, cfg *model.Cfg, log *logger.Log, apiv1Client *apiv1.Client, tracer *trace.Tracer) (messagebroker.EventConsumer, error) {
if !cfg.Common.Kafka.Enabled {
log.Info("Kafka disabled - no consumer created")
return nil, nil
}

kafkaMessageConsumerClient, err := kafka.NewMessageConsumerClient(kafka.CommonConsumerConfig(cfg), cfg.Common.Kafka.Brokers, log.New("kafka_consumer_client"))
client, err := kafka.NewConsumerClient(ctx, cfg, cfg.Common.Kafka.Brokers, log.New("kafka_consumer_client"))
if err != nil {
return nil, err
}
Expand All @@ -38,10 +38,10 @@ func NewEventConsumer(cfg *model.Cfg, log *logger.Log, apiv1Client *apiv1.Client
return &kafka.ConsumerGroupHandler{Handlers: handlersMap, Log: log.New("kafka_consumer_group_handler")}
}

if err := kafkaMessageConsumerClient.Start(handlerFactory, handlerConfigs); err != nil {
if err := client.Start(ctx, handlerFactory, handlerConfigs); err != nil {
return nil, err
}
return kafkaMessageConsumerClient, nil
return client, nil
}

type MockNextMessageHandler struct {
Expand Down Expand Up @@ -89,6 +89,6 @@ func newUploadMessageHandler(log *logger.Log, apiv1 *apiv1.Client, tracer *trace
}

func (h *UploadMessageHandler) HandleMessage(ctx context.Context, message *sarama.ConsumerMessage) error {
h.log.Debug("Consuming message to debug", "message.Key", string(message.Key), "message.Topic", message.Topic, "message.Value", string(message.Value))
h.log.Debug("Consuming message to debug", "message.Key", string(message.Key), "message.Topic", message.Topic)
return nil
}
22 changes: 11 additions & 11 deletions internal/ui/outbound/kafka_message_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@ import (
"vc/pkg/trace"
)

type KafkaMessageProducer struct {
kafkaMessageProducerClient *kafka.MessageSyncProducerClient
type kafkaMessageProducer struct {
client *kafka.SyncProducerClient
}

func NewEventPublisher(ctx context.Context, cfg *model.Cfg, tracer *trace.Tracer, log *logger.Log) (apiv1.EventPublisher, error) {
kafkaMessageProducerClient, err := kafka.NewMessageSyncProducerClient(kafka.CommonProducerConfig(cfg), ctx, cfg, tracer, log.New("kafka_message_producer_client"))
func New(ctx context.Context, cfg *model.Cfg, tracer *trace.Tracer, log *logger.Log) (apiv1.EventPublisher, error) {
client, err := kafka.NewSyncProducerClient(ctx, kafka.CommonProducerConfig(cfg), cfg, tracer, log.New("kafka_message_producer_client"))
if err != nil {
return nil, err
}
return &KafkaMessageProducer{
kafkaMessageProducerClient: kafkaMessageProducerClient,
return &kafkaMessageProducer{
client: client,
}, nil
}

func (s *KafkaMessageProducer) MockNext(mockNextRequest *apiv1.MockNextRequest) error {
func (s *kafkaMessageProducer) MockNext(mockNextRequest *apiv1.MockNextRequest) error {
if mockNextRequest == nil {
return errors.New("param mockNextRequest is nil")
}
Expand All @@ -44,12 +44,12 @@ func (s *KafkaMessageProducer) MockNext(mockNextRequest *apiv1.MockNextRequest)
{Key: []byte(kafka.TypeOfStructInMessageValue), Value: typeHeader},
}

return s.kafkaMessageProducerClient.PublishMessage(kafka.TopicMockNext, mockNextRequest.AuthenticSourcePersonId, jsonMarshaled, headers)
return s.client.PublishMessage(kafka.TopicMockNext, mockNextRequest.AuthenticSourcePersonId, jsonMarshaled, headers)
}

func (s *KafkaMessageProducer) Close(ctx context.Context) error {
if s.kafkaMessageProducerClient != nil {
return s.kafkaMessageProducerClient.Close(ctx)
func (s *kafkaMessageProducer) Close(ctx context.Context) error {
if s.client != nil {
return s.client.Close(ctx)
}
return nil
}
159 changes: 159 additions & 0 deletions pkg/messagebroker/kafka/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package kafka

import (
"context"
"errors"
"fmt"
"github.com/IBM/sarama"
"reflect"
"sync"
"time"
"vc/pkg/logger"
"vc/pkg/model"
)

const (
TopicMockNext = "topic_mock_next"
TopicUpload = "topic_upload"
TypeOfStructInMessageValue = "type_of_struct_in_value"
)

type HandlerConfig struct {
Topic string
ConsumerGroup string
}

type Consumer interface {
Start(handlerFactory func(string) sarama.ConsumerGroupHandler) error
Close(ctx context.Context) error
}

// MessageConsumerClient ATTENTION: Start max one instance of consumer client for each service to keep resource usage low
type MessageConsumerClient struct {
SaramaConfig *sarama.Config
brokers []string
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
log *logger.Log
}

func (c *MessageConsumerClient) CommonConsumerConfig(cfg *model.Cfg) {
//TODO(mk): set cfg from file - is now hardcoded
//TODO(mk): enable security when consumting from Kafka
c.SaramaConfig = sarama.NewConfig()
c.SaramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
c.SaramaConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
c.SaramaConfig.Net.SASL.Enable = false
// ...
//return SaramaConfig
}

func NewConsumerClient(ctx context.Context, cfg *model.Cfg, brokers []string, log *logger.Log) (*MessageConsumerClient, error) {
client := &MessageConsumerClient{
SaramaConfig: &sarama.Config{},
brokers: brokers,
wg: sync.WaitGroup{},
log: log,
}

client.CommonConsumerConfig(cfg)

return client, nil
}

func (c *MessageConsumerClient) Start(ctx context.Context, handlerFactory func(string) sarama.ConsumerGroupHandler, handlerConfigs []HandlerConfig) error {
if err := c.SaramaConfig.Validate(); err != nil {
return err
}

for _, handlerConfig := range handlerConfigs {
consumerGroup, err := sarama.NewConsumerGroup(c.brokers, handlerConfig.ConsumerGroup, c.SaramaConfig)
if err != nil {
c.log.Error(err, "Error creating consumer group", "group", handlerConfig.ConsumerGroup)
return err
}
c.log.Info("Started consumer group", "group", handlerConfig.ConsumerGroup)

var cancelCtx context.Context
cancelCtx, c.cancel = context.WithCancel(ctx)

c.wg.Add(1)
go func(group sarama.ConsumerGroup, topic string) {
defer c.wg.Done()
for {
handler := handlerFactory(topic)
if err := group.Consume(cancelCtx, []string{topic}, handler); err != nil {
c.log.Error(err, "Error on consumer group", "group", handlerConfig.ConsumerGroup)
//TODO(mk): use more advanced backoff algorithm?
time.Sleep(1 * time.Second)
}

if cancelCtx.Err() != nil {
return
}
}
}(consumerGroup, handlerConfig.Topic)
}
return nil
}

func (c *MessageConsumerClient) Close(ctx context.Context) error {
c.cancel()
c.wg.Wait()
c.log.Info("Closed")
return nil
}

type MessageHandler interface {
HandleMessage(ctx context.Context, message *sarama.ConsumerMessage) error
}

type ConsumerGroupHandler struct {
Handlers map[string]MessageHandler
Log *logger.Log
}

func (cgh *ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (cgh *ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

func (cgh *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
if cgh.Handlers == nil {
cgh.Log.Error(errors.New("No handlers defined"), "No Handlers for any topic")
//TODO(mk): send to a general error topic?
return nil
}

handler, exists := cgh.Handlers[claim.Topic()]
if !exists {
cgh.Log.Error(errors.New("No handler for topic"), "topic", claim.Topic())
//TODO(mk): send to a general error topic?
return nil
}

handlerType := reflect.TypeOf(handler).String()

for message := range claim.Messages() {
var errMessage string

if err := handler.HandleMessage(session.Context(), message); err != nil {
cgh.Log.Error(err, "Error handling message", "topic", claim.Topic())
//TODO(mk): more advanced retry/error handling including send to error topic if not OK after X number of retries
errMessage = fmt.Sprintf("error handling message: %v", err)
}

info := fmt.Sprintf("message consumed by handler type: %s, topic: %s, partition: %d, offset: %d",
handlerType,
claim.Topic(),
message.Partition,
message.Offset,
)
if errMessage != "" {
info = fmt.Sprintf("%s, error: %s", info, errMessage)
}
cgh.Log.Debug("Consumed message", "info", info)

session.MarkMessage(message, info)
}
return nil
}
Loading

0 comments on commit 5d43b6a

Please sign in to comment.