diff --git a/cmd/apigw/main.go b/cmd/apigw/main.go index cb3c0034..7b7a19cf 100644 --- a/cmd/apigw/main.go +++ b/cmd/apigw/main.go @@ -61,16 +61,16 @@ func main() { panic(err) } + mainLog := log.New("main") + var eventPublisher apiv1.EventPublisher - if cfg.Common.Kafka.Enabled { + if cfg.IsAsyncEnabled(mainLog) { var err error eventPublisher, err = outbound.New(ctx, cfg, tracer, log) + services["eventPublisher"] = eventPublisher if err != nil { panic(err) } - services["eventPublisher"] = eventPublisher - } else { - log.Info("EventPublisher disabled in config") } apiv1Client, err := apiv1.New(ctx, kvClient, dbService, simpleQueueService, tracer, cfg, log.New("apiv1")) @@ -83,12 +83,12 @@ func main() { panic(err) } - eventConsumer, err := inbound.New(ctx, cfg, log, apiv1Client, tracer) - if err != nil { - panic(err) - } - if eventConsumer != nil { + if cfg.IsAsyncEnabled(mainLog) { + eventConsumer, err := inbound.New(ctx, cfg, log.New("eventConsumer"), apiv1Client, tracer) services["eventConsumer"] = eventConsumer + if err != nil { + panic(err) + } } // Handle sigterm and await termChan signal @@ -97,7 +97,6 @@ func main() { <-termChan // Blocks here until interrupted - mainLog := log.New("main") mainLog.Info("HALTING SIGNAL!") for serviceName, service := range services { diff --git a/cmd/mockas/main.go b/cmd/mockas/main.go index 2465a88e..73e307c9 100644 --- a/cmd/mockas/main.go +++ b/cmd/mockas/main.go @@ -33,7 +33,6 @@ func main() { if err != nil { panic(err) } - mainLog := log.New("main") tracer, err := trace.New(ctx, cfg, log, "vc", "mock_as") if err != nil { @@ -51,12 +50,13 @@ func main() { panic(err) } - eventConsumer, err := inbound.New(ctx, cfg, log, apiv1Client, tracer) - if err != nil { - panic(err) - } - if eventConsumer != nil { + mainLog := log.New("main") + if cfg.IsAsyncEnabled(mainLog) { + eventConsumer, err := inbound.New(ctx, cfg, log.New("eventConsumer"), apiv1Client, tracer) services["eventConsumer"] = eventConsumer + if err != nil { + panic(err) + } } // Handle sigterm and await termChan signal diff --git a/cmd/ui/main.go b/cmd/ui/main.go index 275658d2..b15b1d72 100644 --- a/cmd/ui/main.go +++ b/cmd/ui/main.go @@ -46,14 +46,16 @@ func main() { panic(err) } + mainLog := log.New("main") + var eventPublisher apiv1.EventPublisher - if cfg.Common.Kafka.Enabled { + if cfg.IsAsyncEnabled(mainLog) { var err error eventPublisher, err = outbound.New(ctx, cfg, tracer, log) + services["eventPublisher"] = eventPublisher if err != nil { panic(err) } - services["eventPublisher"] = eventPublisher } else { log.Info("EventPublisher disabled in config") } @@ -76,7 +78,6 @@ func main() { <-termChan // Blocks here until interrupted - mainLog := log.New("main") mainLog.Info("HALTING SIGNAL!") for serviceName, service := range services { diff --git a/internal/apigw/inbound/kafka_message_handler.go b/internal/apigw/inbound/kafka_message_handler.go index 3cc719b3..b4b75de5 100644 --- a/internal/apigw/inbound/kafka_message_handler.go +++ b/internal/apigw/inbound/kafka_message_handler.go @@ -12,6 +12,7 @@ import ( "vc/pkg/trace" ) +// New creates a new Kafka event consumer instance used by apigw func New(ctx context.Context, cfg *model.Cfg, log *logger.Log, apiv1Client *apiv1.Client, tracer *trace.Tracer) (messagebroker.EventConsumer, error) { if !cfg.Common.Kafka.Enabled { log.Info("Kafka disabled - no consumer created") @@ -50,12 +51,14 @@ func newUploadMessageHandler(log *logger.Log, apiv1 *apiv1.Client, tracer *trace } } +// UploadMessageHandler struct that handles Kafka messages of type UploadRequest type UploadMessageHandler struct { log *logger.Log apiv1 *apiv1.Client tracer *trace.Tracer } +// HandleMessage handles Kafka message of type UploadRequest func (h *UploadMessageHandler) HandleMessage(ctx context.Context, message *sarama.ConsumerMessage) error { var uploadRequest apiv1.UploadRequest if err := json.Unmarshal(message.Value, &uploadRequest); err != nil { diff --git a/internal/apigw/outbound/kafka_message_publisher.go b/internal/apigw/outbound/kafka_message_publisher.go index e618988a..3c7ec0ad 100644 --- a/internal/apigw/outbound/kafka_message_publisher.go +++ b/internal/apigw/outbound/kafka_message_publisher.go @@ -17,8 +17,10 @@ type kafkaMessageProducer struct { client *kafka.SyncProducerClient } +// New creates a new instance of a kafka event publisher used by apigw func New(ctx context.Context, cfg *model.Cfg, tracer *trace.Tracer, log *logger.Log) (apiv1.EventPublisher, error) { - client, err := kafka.NewSyncProducerClient(ctx, kafka.CommonProducerConfig(cfg), cfg, tracer, log.New("kafka_message_producer_client")) + saramaConfig := kafka.CommonProducerConfig(cfg) + client, err := kafka.NewSyncProducerClient(ctx, saramaConfig, cfg, tracer, log.New("kafka_message_producer_client")) if err != nil { return nil, err } @@ -29,6 +31,7 @@ func New(ctx context.Context, cfg *model.Cfg, tracer *trace.Tracer, log *logger. } +// Upload publish a UploadRequest message to a Kafka topic func (s *kafkaMessageProducer) Upload(uploadRequest *apiv1.UploadRequest) error { if uploadRequest == nil { return errors.New("param uploadRequest is nil") @@ -49,6 +52,7 @@ func (s *kafkaMessageProducer) Upload(uploadRequest *apiv1.UploadRequest) error return s.client.PublishMessage(kafka.TopicUpload, uploadRequest.Meta.DocumentID, jsonMarshaled, headers) } +// Close closes all resources used/started by the publisher func (s *kafkaMessageProducer) Close(ctx context.Context) error { if s.client != nil { return s.client.Close(ctx) diff --git a/internal/mockas/inbound/kafka_message_handlers.go b/internal/mockas/inbound/kafka_message_handlers.go index 6eb8d60d..26333db7 100644 --- a/internal/mockas/inbound/kafka_message_handlers.go +++ b/internal/mockas/inbound/kafka_message_handlers.go @@ -12,6 +12,7 @@ import ( "vc/pkg/trace" ) +// New creates a new Kafka event consumer instance used by mockas func New(ctx context.Context, cfg *model.Cfg, log *logger.Log, apiv1Client *apiv1.Client, tracer *trace.Tracer) (messagebroker.EventConsumer, error) { if !cfg.Common.Kafka.Enabled { log.Info("Kafka disabled - no consumer created") @@ -44,6 +45,7 @@ func New(ctx context.Context, cfg *model.Cfg, log *logger.Log, apiv1Client *apiv return client, nil } +// MockNextMessageHandler struct that handles Kafka messages of type MockNextRequest type MockNextMessageHandler struct { log *logger.Log apiv1 *apiv1.Client @@ -58,6 +60,7 @@ func newMockNextMessageHandler(log *logger.Log, apiv1 *apiv1.Client, tracer *tra } } +// HandleMessage handles Kafka message of type MockNextRequest func (h *MockNextMessageHandler) HandleMessage(ctx context.Context, message *sarama.ConsumerMessage) error { var mockNextRequest apiv1.MockNextRequest if err := json.Unmarshal(message.Value, &mockNextRequest); err != nil { @@ -73,7 +76,7 @@ func (h *MockNextMessageHandler) HandleMessage(ctx context.Context, message *sar return nil } -// TODO(mk): REMOVE ME, JUST TO TEST A SECOND KAFKA CONSUMER GROUP FROM A DIFFERENT SERVICE +// TODO(mk): REMOVE ME BEFORE PRODUCTION, JUST TO TEST A SECOND KAFKA CONSUMER GROUP FROM A DIFFERENT SERVICE type UploadMessageHandler struct { log *logger.Log apiv1 *apiv1.Client diff --git a/internal/ui/outbound/kafka_message_publisher.go b/internal/ui/outbound/kafka_message_publisher.go index cd7fc899..f9e35764 100644 --- a/internal/ui/outbound/kafka_message_publisher.go +++ b/internal/ui/outbound/kafka_message_publisher.go @@ -17,8 +17,10 @@ type kafkaMessageProducer struct { client *kafka.SyncProducerClient } +// New creates a new instance of a kafka event publisher used by ui func New(ctx context.Context, cfg *model.Cfg, tracer *trace.Tracer, log *logger.Log) (apiv1.EventPublisher, error) { - client, err := kafka.NewSyncProducerClient(ctx, kafka.CommonProducerConfig(cfg), cfg, tracer, log.New("kafka_message_producer_client")) + saramaConfig := kafka.CommonProducerConfig(cfg) + client, err := kafka.NewSyncProducerClient(ctx, saramaConfig, cfg, tracer, log.New("kafka_message_producer_client")) if err != nil { return nil, err } @@ -27,6 +29,7 @@ func New(ctx context.Context, cfg *model.Cfg, tracer *trace.Tracer, log *logger. }, nil } +// MockNext publish a MockNext message to a Kafka topic func (s *kafkaMessageProducer) MockNext(mockNextRequest *apiv1.MockNextRequest) error { if mockNextRequest == nil { return errors.New("param mockNextRequest is nil") @@ -47,6 +50,7 @@ func (s *kafkaMessageProducer) MockNext(mockNextRequest *apiv1.MockNextRequest) return s.client.PublishMessage(kafka.TopicMockNext, mockNextRequest.AuthenticSourcePersonId, jsonMarshaled, headers) } +// Close closes all resources used/started by the publisher func (s *kafkaMessageProducer) Close(ctx context.Context) error { if s.client != nil { return s.client.Close(ctx) diff --git a/pkg/messagebroker/kafka/consumer.go b/pkg/messagebroker/kafka/consumer.go index 7c9e9128..d16167b1 100644 --- a/pkg/messagebroker/kafka/consumer.go +++ b/pkg/messagebroker/kafka/consumer.go @@ -18,6 +18,7 @@ const ( TypeOfStructInMessageValue = "type_of_struct_in_value" ) +// HandlerConfig struct to define the Kafka topic and consumer group for a specific message handler type HandlerConfig struct { Topic string ConsumerGroup string @@ -38,30 +39,29 @@ type MessageConsumerClient struct { log *logger.Log } -func (c *MessageConsumerClient) CommonConsumerConfig(cfg *model.Cfg) { - //TODO(mk): set cfg from file - is now hardcoded - //TODO(mk): enable security when consumting from Kafka - c.SaramaConfig = sarama.NewConfig() - c.SaramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest - c.SaramaConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()} - c.SaramaConfig.Net.SASL.Enable = false - // ... - //return SaramaConfig -} - func NewConsumerClient(ctx context.Context, cfg *model.Cfg, brokers []string, log *logger.Log) (*MessageConsumerClient, error) { client := &MessageConsumerClient{ - SaramaConfig: &sarama.Config{}, + SaramaConfig: commonConsumerConfig(cfg), brokers: brokers, wg: sync.WaitGroup{}, log: log, } - client.CommonConsumerConfig(cfg) - return client, nil } +// commonConsumerConfig returns a new Kafka consumer configuration instance with sane defaults for vc. +func commonConsumerConfig(cfg *model.Cfg) *sarama.Config { + //TODO(mk): set cfg from file - is now hardcoded + saramaConfig := sarama.NewConfig() + saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest + saramaConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()} + saramaConfig.Net.SASL.Enable = false + //TODO(mk): enable and configure security when consuming from Kafka + return saramaConfig +} + +// Start starts the actual event consumting from specified kafka topics func (c *MessageConsumerClient) Start(ctx context.Context, handlerFactory func(string) sarama.ConsumerGroupHandler, handlerConfigs []HandlerConfig) error { if err := c.SaramaConfig.Validate(); err != nil { return err @@ -105,6 +105,7 @@ func (c *MessageConsumerClient) Close(ctx context.Context) error { return nil } +// MessageHandler definition of a generic Kafka message handler type MessageHandler interface { HandleMessage(ctx context.Context, message *sarama.ConsumerMessage) error } diff --git a/pkg/messagebroker/kafka/producer.go b/pkg/messagebroker/kafka/producer.go index d1be92b7..28e0f5a9 100644 --- a/pkg/messagebroker/kafka/producer.go +++ b/pkg/messagebroker/kafka/producer.go @@ -10,7 +10,7 @@ import ( "vc/pkg/trace" ) -// MessageConsumerClient ATTENTION: Start max one instance of consumer client for each service to keep resource usage low +// MessageConsumerClient ATTENTION: Start max one instance of Kafka consumer client for each service to keep resource usage low type SyncProducerClient struct { producer sarama.SyncProducer cfg *model.Cfg @@ -18,6 +18,7 @@ type SyncProducerClient struct { log *logger.Log } +// NewSyncProducerClient creates a Kafka sync producer client func NewSyncProducerClient(ctx context.Context, saramaConfig *sarama.Config, cfg *model.Cfg, tracer *trace.Tracer, log *logger.Log) (*SyncProducerClient, error) { client := &SyncProducerClient{ cfg: cfg, @@ -25,7 +26,7 @@ func NewSyncProducerClient(ctx context.Context, saramaConfig *sarama.Config, cfg log: log, } - log.Info("Starting ...") + client.log.Info("Starting ...") if saramaConfig == nil { return nil, errors.New("param saramaConfig is nil") @@ -40,13 +41,13 @@ func NewSyncProducerClient(ctx context.Context, saramaConfig *sarama.Config, cfg return nil, err } - log.Info("... started.") + client.log.Info("... started.") return client, nil } +// CommonProducerConfig returns a new Kafka producer configuration instance with sane defaults for vc. func CommonProducerConfig(cfg *model.Cfg) *sarama.Config { //TODO(mk): set cfg from file - is now hardcoded - //TODO(mk: enable security when publishing to Kafka saramaConfig := sarama.NewConfig() saramaConfig.Producer.Return.Successes = true saramaConfig.Producer.RequiredAcks = sarama.WaitForAll @@ -54,10 +55,11 @@ func CommonProducerConfig(cfg *model.Cfg) *sarama.Config { saramaConfig.Net.MaxOpenRequests = 1 saramaConfig.Producer.Retry.Max = 3 saramaConfig.Net.SASL.Enable = false - // ... + //TODO(mk): enable and configure security when publishing to Kafka return saramaConfig } +// Close close the producer func (c *SyncProducerClient) Close(ctx context.Context) error { err := c.producer.Close() if err != nil { @@ -68,6 +70,7 @@ func (c *SyncProducerClient) Close(ctx context.Context) error { return nil } +// PublishMessage publish a message to a Kafka topic func (c *SyncProducerClient) PublishMessage(topic string, key string, json []byte, headers []sarama.RecordHeader) error { //TODO(mk): create header data in this func, ie change func def. message := &sarama.ProducerMessage{ diff --git a/pkg/model/config.go b/pkg/model/config.go index 43ff8c98..bfcb22a4 100644 --- a/pkg/model/config.go +++ b/pkg/model/config.go @@ -1,5 +1,9 @@ package model +import ( + "vc/pkg/logger" +) + // APIServer holds the api server configuration type APIServer struct { Addr string `yaml:"addr" validate:"required"` @@ -220,3 +224,11 @@ type Cfg struct { MockAS MockAS `yaml:"mock_as" validate:"omitempty"` UI UI `yaml:"ui" validate:"omitempty"` } + +func (cfg *Cfg) IsAsyncEnabled(log *logger.Log) bool { + enabled := cfg.Common.Kafka.Enabled + if !enabled { + log.Info("EventPublisher disabled in config") + } + return enabled +}