From 8ad6288cb91f2292ba9db45255ff660cd186253e Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Thu, 13 Aug 2020 17:31:39 +0530 Subject: [PATCH] Gracefully shutdown all routines if SIGTERM received More Info: https://github.com/Shopify/sarama/issues/1776 --- redshiftsink/cmd/redshiftbatcher/main.go | 16 +- redshiftsink/pkg/consumer/batcher.go | 157 ++++++++++++------ redshiftsink/pkg/consumer/consumer.go | 42 +++-- .../consumer/{client.go => consumer_group.go} | 43 +++-- redshiftsink/pkg/consumer/manager.go | 39 +++-- 5 files changed, 202 insertions(+), 95 deletions(-) rename redshiftsink/pkg/consumer/{client.go => consumer_group.go} (71%) diff --git a/redshiftsink/cmd/redshiftbatcher/main.go b/redshiftsink/cmd/redshiftbatcher/main.go index 75d1567e0..ff3d60701 100644 --- a/redshiftsink/cmd/redshiftbatcher/main.go +++ b/redshiftsink/cmd/redshiftbatcher/main.go @@ -39,9 +39,9 @@ func run(cmd *cobra.Command, args []string) { os.Exit(1) } - client, err := consumer.NewClient(config.Kafka, config.Sarama) + consumerGroup, err := consumer.NewConsumerGroup(config.Kafka, config.Sarama) if err != nil { - klog.Errorf("Error creating kafka consumer client, exiting: %v\n", err) + klog.Errorf("Error creating kafka consumer group, exiting: %v\n", err) os.Exit(1) } @@ -49,7 +49,7 @@ func run(cmd *cobra.Command, args []string) { klog.Info("Succesfully created kafka client") manager := consumer.NewManager( - client, + consumerGroup, config.Kafka.TopicPrefixes, ) wg := &sync.WaitGroup{} @@ -68,16 +68,20 @@ func run(cmd *cobra.Command, args []string) { case <-ctx.Done(): klog.Info("Context cancelled, bye bye!") case <-sigterm: - klog.Info("Sigterm signal received, Goodbye till will meet again!") + klog.Info("Sigterm signal received") } cancel() wg.Wait() - if err = client.Close(); err != nil { - klog.Errorf("Error closing client: %v", err) + if err = consumerGroup.Close(); err != nil { + klog.Errorf("Error closing group: %v", err) os.Exit(1) } + + klog.Info("Goodbye!") } +// main => consumer.manager.Consume() => consumer.consumerGroup.Consume() +// => sarama/consumer_group.Consume() => consumer.ConsumeClaim() func main() { rootCmd.Execute() } diff --git a/redshiftsink/pkg/consumer/batcher.go b/redshiftsink/pkg/consumer/batcher.go index fc156b9ea..cdd4b2852 100644 --- a/redshiftsink/pkg/consumer/batcher.go +++ b/redshiftsink/pkg/consumer/batcher.go @@ -2,6 +2,7 @@ package consumer import ( "bytes" + "context" "fmt" "path/filepath" "sync" @@ -95,6 +96,10 @@ type batchProcessor struct { // batchEndOffset is the ending offset of the batch // this is useful only for logging and debugging purpose batchEndOffset int64 + + // lastCommitedOffset tells the last commitedOffset + // this is helpful to log at the time of shutdown and can help in debugging + lastCommittedOffset int64 } func newBatchProcessor( @@ -121,6 +126,26 @@ func newBatchProcessor( } } +func (b *batchProcessor) setBatchId() { + if b.batchId == maxBatchId { + klog.V(5).Infof("topic:%s: Resetting batchId to zero.") + b.batchId = 0 + } + + b.batchId += 1 +} + +func (b *batchProcessor) setS3key(topic string, partition int32, offset int64) { + b.s3Key = filepath.Join( + b.s3BucketDir, + topic, + fmt.Sprintf( + "%d_offset_%d_partition.csv", + offset, + partition), + ) +} + func (b *batchProcessor) commitOffset(datas []interface{}) { for _, data := range datas { // TODO: this should not be required, fix the gobatch code @@ -135,10 +160,28 @@ func (b *batchProcessor) commitOffset(datas []interface{}) { // need to worry about this. Since the write to s3 again will // just overwrite the same data. b.session.Commit() - klog.V(5).Infof( - "topic=%s, message=%s: Processed\n", + // 99 is an exception + klog.V(99).Infof( + "topic:%s, message:%s: Processed\n", message.Topic, string(message.Value), ) + b.lastCommittedOffset = message.Offset + } +} + +func (b *batchProcessor) shutdownInfo() { + klog.Infof( + "topic:%s: Batch processing gracefully shutdown.\n", + b.topic, + ) + if b.lastCommittedOffset == 0 { + klog.Infof("topic:%s: Nothing new was committed.\n", b.topic) + } else { + klog.Infof( + "topic:%s: lastCommittedOffset: %d.\n", + b.topic, + b.lastCommittedOffset, + ) } } @@ -146,71 +189,79 @@ func (b *batchProcessor) signalLoad(bucket string, key string) { } -func (b *batchProcessor) transform(data *sarama.ConsumerMessage) []byte { - return []byte{} -} +func (b *batchProcessor) processMessage( + message *sarama.ConsumerMessage, id int) { -func (b *batchProcessor) setS3key(topic string, partition int32, offset int64) { - b.s3Key = filepath.Join( - b.s3BucketDir, - topic, - fmt.Sprintf( - "%d_offset_%d_partition.csv", - offset, - partition), + klog.V(5).Infof( + "topic:%s, batchId:%d id:%d: transforming\n", + b.topic, b.batchId, id, + ) + // V(99) is an expception but required + klog.V(99).Infof( + "topic:%s, batchId:%d id:%d: message:%+v\n", + b.topic, b.batchId, id, message, ) -} -func (b *batchProcessor) newtransformedBuffer(datas []interface{}) { - b.s3Key = "" + // key is always made based on the first not nil message in the batch + if b.s3Key == "" { + b.setS3key(message.Topic, message.Partition, message.Offset) - for id, data := range datas { - // TODO: this should not be required, fix the gobatch code - if data == nil { - continue - } - message := data.(*sarama.ConsumerMessage) - klog.V(5).Infof( - "topic=%s, batchId=%d id=%d: transforming\n", - b.topic, b.batchId, id) - klog.V(5).Infof( - "topic=%s, batchId=%d id=%d: message=%+v\n", - b.topic, b.batchId, id, message) - - // key is always made based on the first not nil message in the batch - if b.s3Key == "" { - b.setS3key(message.Topic, message.Partition, message.Offset) - klog.V(5).Infof("topic=%s, batchId=%d id=%d: s3Key=%s\n", - b.topic, b.batchId, id, b.s3Key) - b.batchStartOffset = message.Offset - } + klog.V(5).Infof("topic:%s, batchId:%d id:%d: s3Key:%s\n", + b.topic, b.batchId, id, b.s3Key, + ) - b.bodyBuf.Write(message.Value) - b.bodyBuf.Write([]byte{'\n'}) - klog.V(5).Infof( - "topic=%s, batchId=%d id=%d: transformed\n", - b.topic, b.batchId, id) - b.batchEndOffset = message.Offset + b.batchStartOffset = message.Offset } + + b.bodyBuf.Write(message.Value) + b.bodyBuf.Write([]byte{'\n'}) + + klog.V(5).Infof( + "topic:%s, batchId:%d id:%d: transformed\n", + b.topic, b.batchId, id, + ) + + b.batchEndOffset = message.Offset } -func (b *batchProcessor) setBatchId() { - if b.batchId == maxBatchId { - klog.V(5).Infof("topic=%s: Resetting batchId to zero.") - b.batchId = 0 +// processBatch handles the batch procesing and return true if all completes +// otherwise return false in case of gracefull shutdown signals being captured, +// this helps in cleanly shutting down the batch processing. +func (b *batchProcessor) processBatch( + ctx context.Context, datas []interface{}) bool { + + b.s3Key = "" + for id, data := range datas { + select { + case <-ctx.Done(): + return false + default: + // TODO: fix gobatch for this + if data == nil { + continue + } + message := data.(*sarama.ConsumerMessage) + b.processMessage(message, id) + } } - b.batchId += 1 + return true } func (b *batchProcessor) process(workerID int, datas []interface{}) { b.setBatchId() - klog.Infof("topic=%s, batchId=%d, size=%d: Processing...\n", - b.topic, b.batchId, len(datas)) + + klog.Infof("topic:%s, batchId:%d, size:%d: Processing...\n", + b.topic, b.batchId, len(datas), + ) // TODO: transform the debezium event into redshift commands // and create a new buffer - b.newtransformedBuffer(datas) + done := b.processBatch(b.session.Context(), datas) + if !done { + b.shutdownInfo() + return + } err := b.s3sink.Upload(b.s3Key, b.bodyBuf) if err != nil { @@ -218,11 +269,11 @@ func (b *batchProcessor) process(workerID int, datas []interface{}) { } klog.V(4).Infof( - "topic=%s, batchId=%d, startOffset: %d, endOffset: %d: Uploaded", + "topic:%s, batchId:%d, startOffset:%d, endOffset:%d: Uploaded", b.topic, b.batchId, b.batchStartOffset, b.batchEndOffset, ) klog.V(5).Infof( - "topic=%s, batchId=%d: Uploaded at: %s", + "topic:%s, batchId:%d: Uploaded: %s", b.topic, b.batchId, b.s3Key, ) @@ -231,7 +282,7 @@ func (b *batchProcessor) process(workerID int, datas []interface{}) { b.commitOffset(datas) klog.Infof( - "topic=%s, batchId=%d, startOffset: %d, endOffset: %d: Processed", + "topic:%s, batchId:%d, startOffset:%d, endOffset:%d: Processed", b.topic, b.batchId, b.batchStartOffset, b.batchEndOffset, ) } diff --git a/redshiftsink/pkg/consumer/consumer.go b/redshiftsink/pkg/consumer/consumer.go index 32ea0221f..eef7e9167 100644 --- a/redshiftsink/pkg/consumer/consumer.go +++ b/redshiftsink/pkg/consumer/consumer.go @@ -42,7 +42,7 @@ func (c consumer) processMessage( // load the batcher for the topic ub, _ := c.batchers.Load(message.Topic) if ub == nil { - klog.Fatalf("Error loading batcher for topic=%s\n", message.Topic) + klog.Fatalf("Error loading batcher for topic:%s\n", message.Topic) } b := ub.(*batcher) @@ -62,7 +62,13 @@ func (c consumer) processMessage( ) } - b.mbatch.Insert(message) + select { + case <-b.processor.session.Context().Done(): + klog.Info("Graceful shutdown requested, not inserting in batch") + return nil + default: + b.mbatch.Insert(message) + } return nil } @@ -72,21 +78,37 @@ func (c consumer) ConsumeClaim( session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - klog.V(4).Infof("Starting to consume messages... Claims: %+v\n", session.Claims()) + klog.V(2).Infof( + "ConsumeClaim for topic:%s, partition:%d, initalOffset:%d\n", + claim.Topic(), + claim.Partition(), + claim.InitialOffset(), + ) // NOTE: // Do not move the code below to a goroutine. // The `ConsumeClaim` itself is called within a goroutine, see: // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 for message := range claim.Messages() { - err := c.processMessage(session, message) - if err != nil { - klog.Errorf("Error processing: value=%s, timestamp=%v, topic=%s", - string(message.Value), message.Timestamp, message.Topic) - continue + select { + case <-session.Context().Done(): + klog.Infof( + "Gracefully shutdown. Stopped taking new messages.") + return nil + default: + err := c.processMessage(session, message) + if err != nil { + klog.Errorf( + "Error processing: value:%s, timestamp:%v, topic:%s", + string(message.Value), message.Timestamp, message.Topic) + continue + } } } - klog.V(4).Info("Shutting down ConsumerClaim.") - + klog.V(4).Infof( + "ConsumeClaim shut down for topic: %s, partition: %d\n", + claim.Topic(), + claim.Partition(), + ) return nil } diff --git a/redshiftsink/pkg/consumer/client.go b/redshiftsink/pkg/consumer/consumer_group.go similarity index 71% rename from redshiftsink/pkg/consumer/client.go rename to redshiftsink/pkg/consumer/consumer_group.go index 6ac6dfa66..0bfc5c534 100644 --- a/redshiftsink/pkg/consumer/client.go +++ b/redshiftsink/pkg/consumer/consumer_group.go @@ -11,8 +11,9 @@ import ( "github.com/practo/klog/v2" ) -type Client interface { +type ConsumerGroup interface { Topics() ([]string, error) + LastOffset(topic string, partition int32) (int64, error) Consume(ctx context.Context, topics []string, ready chan bool) error Close() error } @@ -31,16 +32,18 @@ type SaramaConfig struct { Log bool `yaml: log` } -func NewClient(k KafkaConfig, s SaramaConfig) (Client, error) { +func NewConsumerGroup(k KafkaConfig, s SaramaConfig) (ConsumerGroup, error) { switch k.KafkaClient { case "sarama": - return NewSaramaClient(k, s) + return NewSaramaConsumerGroup(k, s) default: return nil, fmt.Errorf("kafkaClient not supported: %v\n", k.KafkaClient) } } -func NewSaramaClient(k KafkaConfig, s SaramaConfig) (Client, error) { +func NewSaramaConsumerGroup( + k KafkaConfig, s SaramaConfig) (ConsumerGroup, error) { + if s.Log { sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) } @@ -83,21 +86,21 @@ func NewSaramaClient(k KafkaConfig, s SaramaConfig) (Client, error) { return nil, fmt.Errorf("Error creating consumer group: %v\n", err) } - cluster, err := sarama.NewConsumer(brokers, c) + client, err := sarama.NewClient(brokers, c) if err != nil { - return nil, fmt.Errorf("Error creating consumer: %v\n", err) + return nil, fmt.Errorf("Error creating client: %v\n", err) } - return &saramaClient{ - cluster: cluster, + return &saramaConsumerGroup{ + client: client, consumerGroup: consumerGroup, consumer: NewConsumer(), }, nil } -type saramaClient struct { - // cluster is required to get Kafka cluster related info like Topics - cluster sarama.Consumer +type saramaConsumerGroup struct { + // client is required to get Kafka cluster related info like Topics + client sarama.Client // consumerGroup uses consumer to consume records in kaafka topics consumerGroup sarama.ConsumerGroup @@ -107,11 +110,16 @@ type saramaClient struct { consumer consumer } -func (c *saramaClient) Topics() ([]string, error) { - return c.cluster.Topics() +func (c *saramaConsumerGroup) Topics() ([]string, error) { + return c.client.Topics() +} + +func (c *saramaConsumerGroup) LastOffset( + topic string, partition int32) (int64, error) { + return c.client.GetOffset(topic, partition, sarama.OffsetNewest) } -func (c *saramaClient) Consume( +func (c *saramaConsumerGroup) Consume( ctx context.Context, topics []string, ready chan bool) error { // create batchers @@ -127,14 +135,17 @@ func (c *saramaClient) Consume( return c.consumerGroup.Consume(ctx, topics, c.consumer) } -func (c *saramaClient) Close() error { +func (c *saramaConsumerGroup) Close() error { + klog.V(4).Infof("Closing consumerGroup.") if err := c.consumerGroup.Close(); err != nil { return err } - if err := c.cluster.Close(); err != nil { + klog.V(4).Info("Closing client.") + if err := c.client.Close(); err != nil { return err } + klog.Info("Shutdown completed.") return nil } diff --git a/redshiftsink/pkg/consumer/manager.go b/redshiftsink/pkg/consumer/manager.go index 981be826c..9e28852a8 100644 --- a/redshiftsink/pkg/consumer/manager.go +++ b/redshiftsink/pkg/consumer/manager.go @@ -11,7 +11,7 @@ import ( type Manager struct { // consumer client, this is sarama now, can be kafka-go later - client Client + consumerGroup ConsumerGroup // topicPrefixes is the list of topics to monitor topicPrefixes []string @@ -28,10 +28,10 @@ type Manager struct { } func NewManager( - client Client, topicPrefixes string) *Manager { + consumerGroup ConsumerGroup, topicPrefixes string) *Manager { return &Manager{ - client: client, + consumerGroup: consumerGroup, topicPrefixes: strings.Split(topicPrefixes, ","), Ready: make(chan bool), } @@ -65,7 +65,7 @@ func (c *Manager) deepCopyTopics() []string { } func (c *Manager) refreshTopics() { - topics, err := c.client.Topics() + topics, err := c.consumerGroup.Topics() if err != nil { klog.Fatalf("Error getting topics, err=%v\n", err) } @@ -83,14 +83,31 @@ func (c *Manager) SyncTopics( c.refreshTopics() select { - case <-ticker.C: - continue case <-ctx.Done(): return + case <-ticker.C: + continue } } } +// TODO: prints the last offset in a topic, can help in making the metric +// for the lag, not being used at present (can call it dead) +func (c *Manager) printLastOffsets() { + for _, topic := range c.topics { + lastOffset, err := c.consumerGroup.LastOffset(topic, 0) + if err != nil { + klog.Errorf("Unable to get offset, err:%v\n", err) + continue + } + klog.Infof( + "topic:%s, partition: 0, lastOffset: %d\n", + topic, + lastOffset, + ) + } +} + func (c *Manager) Consume(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() for { @@ -104,15 +121,17 @@ func (c *Manager) Consume(ctx context.Context, wg *sync.WaitGroup) { continue } - klog.V(4).Infof("Calling consume for %d topic(s)\n", len(topics)) - err := c.client.Consume(ctx, topics, c.Ready) + c.printLastOffsets() + + klog.V(4).Infof("Manager.Consume for %d topic(s)\n", len(topics)) + err := c.consumerGroup.Consume(ctx, topics, c.Ready) if err != nil { klog.Fatalf("Error from consumer: %v", err) } - // check if context was cancelled, signaling that the consumer should stop + // check if context was cancelled, the consumer should stop if ctx.Err() != nil { return } - klog.V(5).Info("Done with Consume. It will be rerun.") + klog.V(4).Info("Manager.Consume completed loop, will re run") } }