Skip to content

Commit

Permalink
Refactorings after review nr 3
Browse files Browse the repository at this point in the history
  • Loading branch information
matskramer committed Oct 8, 2024
1 parent 5d43b6a commit 5b3aa38
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 41 deletions.
19 changes: 9 additions & 10 deletions cmd/apigw/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ func main() {
panic(err)
}

mainLog := log.New("main")

var eventPublisher apiv1.EventPublisher
if cfg.Common.Kafka.Enabled {
if cfg.IsAsyncEnabled(mainLog) {
var err error
eventPublisher, err = outbound.New(ctx, cfg, tracer, log)
services["eventPublisher"] = eventPublisher
if err != nil {
panic(err)
}
services["eventPublisher"] = eventPublisher
} else {
log.Info("EventPublisher disabled in config")
}

apiv1Client, err := apiv1.New(ctx, kvClient, dbService, simpleQueueService, tracer, cfg, log.New("apiv1"))
Expand All @@ -83,12 +83,12 @@ func main() {
panic(err)
}

eventConsumer, err := inbound.New(ctx, cfg, log, apiv1Client, tracer)
if err != nil {
panic(err)
}
if eventConsumer != nil {
if cfg.IsAsyncEnabled(mainLog) {
eventConsumer, err := inbound.New(ctx, cfg, log.New("eventConsumer"), apiv1Client, tracer)
services["eventConsumer"] = eventConsumer
if err != nil {
panic(err)
}
}

// Handle sigterm and await termChan signal
Expand All @@ -97,7 +97,6 @@ func main() {

<-termChan // Blocks here until interrupted

mainLog := log.New("main")
mainLog.Info("HALTING SIGNAL!")

for serviceName, service := range services {
Expand Down
12 changes: 6 additions & 6 deletions cmd/mockas/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func main() {
if err != nil {
panic(err)
}
mainLog := log.New("main")

tracer, err := trace.New(ctx, cfg, log, "vc", "mock_as")
if err != nil {
Expand All @@ -51,12 +50,13 @@ func main() {
panic(err)
}

eventConsumer, err := inbound.New(ctx, cfg, log, apiv1Client, tracer)
if err != nil {
panic(err)
}
if eventConsumer != nil {
mainLog := log.New("main")
if cfg.IsAsyncEnabled(mainLog) {
eventConsumer, err := inbound.New(ctx, cfg, log.New("eventConsumer"), apiv1Client, tracer)
services["eventConsumer"] = eventConsumer
if err != nil {
panic(err)
}
}

// Handle sigterm and await termChan signal
Expand Down
7 changes: 4 additions & 3 deletions cmd/ui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,16 @@ func main() {
panic(err)
}

mainLog := log.New("main")

var eventPublisher apiv1.EventPublisher
if cfg.Common.Kafka.Enabled {
if cfg.IsAsyncEnabled(mainLog) {
var err error
eventPublisher, err = outbound.New(ctx, cfg, tracer, log)
services["eventPublisher"] = eventPublisher
if err != nil {
panic(err)
}
services["eventPublisher"] = eventPublisher
} else {
log.Info("EventPublisher disabled in config")
}
Expand All @@ -76,7 +78,6 @@ func main() {

<-termChan // Blocks here until interrupted

mainLog := log.New("main")
mainLog.Info("HALTING SIGNAL!")

for serviceName, service := range services {
Expand Down
3 changes: 3 additions & 0 deletions internal/apigw/inbound/kafka_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"vc/pkg/trace"
)

// New creates a new Kafka event consumer instance used by apigw
func New(ctx context.Context, cfg *model.Cfg, log *logger.Log, apiv1Client *apiv1.Client, tracer *trace.Tracer) (messagebroker.EventConsumer, error) {
if !cfg.Common.Kafka.Enabled {
log.Info("Kafka disabled - no consumer created")
Expand Down Expand Up @@ -50,12 +51,14 @@ func newUploadMessageHandler(log *logger.Log, apiv1 *apiv1.Client, tracer *trace
}
}

// UploadMessageHandler struct that handles Kafka messages of type UploadRequest
type UploadMessageHandler struct {
log *logger.Log
apiv1 *apiv1.Client
tracer *trace.Tracer
}

// HandleMessage handles Kafka message of type UploadRequest
func (h *UploadMessageHandler) HandleMessage(ctx context.Context, message *sarama.ConsumerMessage) error {
var uploadRequest apiv1.UploadRequest
if err := json.Unmarshal(message.Value, &uploadRequest); err != nil {
Expand Down
6 changes: 5 additions & 1 deletion internal/apigw/outbound/kafka_message_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ type kafkaMessageProducer struct {
client *kafka.SyncProducerClient
}

// New creates a new instance of a kafka event publisher used by apigw
func New(ctx context.Context, cfg *model.Cfg, tracer *trace.Tracer, log *logger.Log) (apiv1.EventPublisher, error) {
client, err := kafka.NewSyncProducerClient(ctx, kafka.CommonProducerConfig(cfg), cfg, tracer, log.New("kafka_message_producer_client"))
saramaConfig := kafka.CommonProducerConfig(cfg)
client, err := kafka.NewSyncProducerClient(ctx, saramaConfig, cfg, tracer, log.New("kafka_message_producer_client"))
if err != nil {
return nil, err
}
Expand All @@ -29,6 +31,7 @@ func New(ctx context.Context, cfg *model.Cfg, tracer *trace.Tracer, log *logger.

}

// Upload publish a UploadRequest message to a Kafka topic
func (s *kafkaMessageProducer) Upload(uploadRequest *apiv1.UploadRequest) error {
if uploadRequest == nil {
return errors.New("param uploadRequest is nil")
Expand All @@ -49,6 +52,7 @@ func (s *kafkaMessageProducer) Upload(uploadRequest *apiv1.UploadRequest) error
return s.client.PublishMessage(kafka.TopicUpload, uploadRequest.Meta.DocumentID, jsonMarshaled, headers)
}

// Close closes all resources used/started by the publisher
func (s *kafkaMessageProducer) Close(ctx context.Context) error {
if s.client != nil {
return s.client.Close(ctx)
Expand Down
5 changes: 4 additions & 1 deletion internal/mockas/inbound/kafka_message_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"vc/pkg/trace"
)

// New creates a new Kafka event consumer instance used by mockas
func New(ctx context.Context, cfg *model.Cfg, log *logger.Log, apiv1Client *apiv1.Client, tracer *trace.Tracer) (messagebroker.EventConsumer, error) {
if !cfg.Common.Kafka.Enabled {
log.Info("Kafka disabled - no consumer created")
Expand Down Expand Up @@ -44,6 +45,7 @@ func New(ctx context.Context, cfg *model.Cfg, log *logger.Log, apiv1Client *apiv
return client, nil
}

// MockNextMessageHandler struct that handles Kafka messages of type MockNextRequest
type MockNextMessageHandler struct {
log *logger.Log
apiv1 *apiv1.Client
Expand All @@ -58,6 +60,7 @@ func newMockNextMessageHandler(log *logger.Log, apiv1 *apiv1.Client, tracer *tra
}
}

// HandleMessage handles Kafka message of type MockNextRequest
func (h *MockNextMessageHandler) HandleMessage(ctx context.Context, message *sarama.ConsumerMessage) error {
var mockNextRequest apiv1.MockNextRequest
if err := json.Unmarshal(message.Value, &mockNextRequest); err != nil {
Expand All @@ -73,7 +76,7 @@ func (h *MockNextMessageHandler) HandleMessage(ctx context.Context, message *sar
return nil
}

// TODO(mk): REMOVE ME, JUST TO TEST A SECOND KAFKA CONSUMER GROUP FROM A DIFFERENT SERVICE
// TODO(mk): REMOVE ME BEFORE PRODUCTION, JUST TO TEST A SECOND KAFKA CONSUMER GROUP FROM A DIFFERENT SERVICE
type UploadMessageHandler struct {
log *logger.Log
apiv1 *apiv1.Client
Expand Down
6 changes: 5 additions & 1 deletion internal/ui/outbound/kafka_message_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ type kafkaMessageProducer struct {
client *kafka.SyncProducerClient
}

// New creates a new instance of a kafka event publisher used by ui
func New(ctx context.Context, cfg *model.Cfg, tracer *trace.Tracer, log *logger.Log) (apiv1.EventPublisher, error) {
client, err := kafka.NewSyncProducerClient(ctx, kafka.CommonProducerConfig(cfg), cfg, tracer, log.New("kafka_message_producer_client"))
saramaConfig := kafka.CommonProducerConfig(cfg)
client, err := kafka.NewSyncProducerClient(ctx, saramaConfig, cfg, tracer, log.New("kafka_message_producer_client"))
if err != nil {
return nil, err
}
Expand All @@ -27,6 +29,7 @@ func New(ctx context.Context, cfg *model.Cfg, tracer *trace.Tracer, log *logger.
}, nil
}

// MockNext publish a MockNext message to a Kafka topic
func (s *kafkaMessageProducer) MockNext(mockNextRequest *apiv1.MockNextRequest) error {
if mockNextRequest == nil {
return errors.New("param mockNextRequest is nil")
Expand All @@ -47,6 +50,7 @@ func (s *kafkaMessageProducer) MockNext(mockNextRequest *apiv1.MockNextRequest)
return s.client.PublishMessage(kafka.TopicMockNext, mockNextRequest.AuthenticSourcePersonId, jsonMarshaled, headers)
}

// Close closes all resources used/started by the publisher
func (s *kafkaMessageProducer) Close(ctx context.Context) error {
if s.client != nil {
return s.client.Close(ctx)
Expand Down
29 changes: 15 additions & 14 deletions pkg/messagebroker/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
TypeOfStructInMessageValue = "type_of_struct_in_value"
)

// HandlerConfig struct to define the Kafka topic and consumer group for a specific message handler
type HandlerConfig struct {
Topic string
ConsumerGroup string
Expand All @@ -38,30 +39,29 @@ type MessageConsumerClient struct {
log *logger.Log
}

func (c *MessageConsumerClient) CommonConsumerConfig(cfg *model.Cfg) {
//TODO(mk): set cfg from file - is now hardcoded
//TODO(mk): enable security when consumting from Kafka
c.SaramaConfig = sarama.NewConfig()
c.SaramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
c.SaramaConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
c.SaramaConfig.Net.SASL.Enable = false
// ...
//return SaramaConfig
}

func NewConsumerClient(ctx context.Context, cfg *model.Cfg, brokers []string, log *logger.Log) (*MessageConsumerClient, error) {
client := &MessageConsumerClient{
SaramaConfig: &sarama.Config{},
SaramaConfig: commonConsumerConfig(cfg),
brokers: brokers,
wg: sync.WaitGroup{},
log: log,
}

client.CommonConsumerConfig(cfg)

return client, nil
}

// commonConsumerConfig returns a new Kafka consumer configuration instance with sane defaults for vc.
func commonConsumerConfig(cfg *model.Cfg) *sarama.Config {
//TODO(mk): set cfg from file - is now hardcoded
saramaConfig := sarama.NewConfig()
saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
saramaConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
saramaConfig.Net.SASL.Enable = false
//TODO(mk): enable and configure security when consuming from Kafka
return saramaConfig
}

// Start starts the actual event consumting from specified kafka topics
func (c *MessageConsumerClient) Start(ctx context.Context, handlerFactory func(string) sarama.ConsumerGroupHandler, handlerConfigs []HandlerConfig) error {
if err := c.SaramaConfig.Validate(); err != nil {
return err
Expand Down Expand Up @@ -105,6 +105,7 @@ func (c *MessageConsumerClient) Close(ctx context.Context) error {
return nil
}

// MessageHandler definition of a generic Kafka message handler
type MessageHandler interface {
HandleMessage(ctx context.Context, message *sarama.ConsumerMessage) error
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/messagebroker/kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,23 @@ import (
"vc/pkg/trace"
)

// MessageConsumerClient ATTENTION: Start max one instance of consumer client for each service to keep resource usage low
// MessageConsumerClient ATTENTION: Start max one instance of Kafka consumer client for each service to keep resource usage low
type SyncProducerClient struct {
producer sarama.SyncProducer
cfg *model.Cfg
tracer *trace.Tracer
log *logger.Log
}

// NewSyncProducerClient creates a Kafka sync producer client
func NewSyncProducerClient(ctx context.Context, saramaConfig *sarama.Config, cfg *model.Cfg, tracer *trace.Tracer, log *logger.Log) (*SyncProducerClient, error) {
client := &SyncProducerClient{
cfg: cfg,
tracer: tracer,
log: log,
}

log.Info("Starting ...")
client.log.Info("Starting ...")

if saramaConfig == nil {
return nil, errors.New("param saramaConfig is nil")
Expand All @@ -40,24 +41,25 @@ func NewSyncProducerClient(ctx context.Context, saramaConfig *sarama.Config, cfg
return nil, err
}

log.Info("... started.")
client.log.Info("... started.")
return client, nil
}

// CommonProducerConfig returns a new Kafka producer configuration instance with sane defaults for vc.
func CommonProducerConfig(cfg *model.Cfg) *sarama.Config {
//TODO(mk): set cfg from file - is now hardcoded
//TODO(mk: enable security when publishing to Kafka
saramaConfig := sarama.NewConfig()
saramaConfig.Producer.Return.Successes = true
saramaConfig.Producer.RequiredAcks = sarama.WaitForAll
saramaConfig.Producer.Idempotent = true
saramaConfig.Net.MaxOpenRequests = 1
saramaConfig.Producer.Retry.Max = 3
saramaConfig.Net.SASL.Enable = false
// ...
//TODO(mk): enable and configure security when publishing to Kafka
return saramaConfig
}

// Close close the producer
func (c *SyncProducerClient) Close(ctx context.Context) error {
err := c.producer.Close()
if err != nil {
Expand All @@ -68,6 +70,7 @@ func (c *SyncProducerClient) Close(ctx context.Context) error {
return nil
}

// PublishMessage publish a message to a Kafka topic
func (c *SyncProducerClient) PublishMessage(topic string, key string, json []byte, headers []sarama.RecordHeader) error {
//TODO(mk): create header data in this func, ie change func def.
message := &sarama.ProducerMessage{
Expand Down
12 changes: 12 additions & 0 deletions pkg/model/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package model

import (
"vc/pkg/logger"
)

// APIServer holds the api server configuration
type APIServer struct {
Addr string `yaml:"addr" validate:"required"`
Expand Down Expand Up @@ -220,3 +224,11 @@ type Cfg struct {
MockAS MockAS `yaml:"mock_as" validate:"omitempty"`
UI UI `yaml:"ui" validate:"omitempty"`
}

func (cfg *Cfg) IsAsyncEnabled(log *logger.Log) bool {
enabled := cfg.Common.Kafka.Enabled
if !enabled {
log.Info("EventPublisher disabled in config")
}
return enabled
}

0 comments on commit 5b3aa38

Please sign in to comment.