Skip to content

Commit

Permalink
Gracefully shutdown all routines if SIGTERM received
Browse files Browse the repository at this point in the history
More Info: IBM/sarama#1776
  • Loading branch information
alok87 committed Aug 13, 2020
1 parent 34ea214 commit 8ad6288
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 95 deletions.
16 changes: 10 additions & 6 deletions redshiftsink/cmd/redshiftbatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ func run(cmd *cobra.Command, args []string) {
os.Exit(1)
}

client, err := consumer.NewClient(config.Kafka, config.Sarama)
consumerGroup, err := consumer.NewConsumerGroup(config.Kafka, config.Sarama)
if err != nil {
klog.Errorf("Error creating kafka consumer client, exiting: %v\n", err)
klog.Errorf("Error creating kafka consumer group, exiting: %v\n", err)
os.Exit(1)
}

ctx, cancel := context.WithCancel(context.Background())
klog.Info("Succesfully created kafka client")

manager := consumer.NewManager(
client,
consumerGroup,
config.Kafka.TopicPrefixes,
)
wg := &sync.WaitGroup{}
Expand All @@ -68,16 +68,20 @@ func run(cmd *cobra.Command, args []string) {
case <-ctx.Done():
klog.Info("Context cancelled, bye bye!")
case <-sigterm:
klog.Info("Sigterm signal received, Goodbye till will meet again!")
klog.Info("Sigterm signal received")
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
klog.Errorf("Error closing client: %v", err)
if err = consumerGroup.Close(); err != nil {
klog.Errorf("Error closing group: %v", err)
os.Exit(1)
}

klog.Info("Goodbye!")
}

// main => consumer.manager.Consume() => consumer.consumerGroup.Consume()
// => sarama/consumer_group.Consume() => consumer.ConsumeClaim()
func main() {
rootCmd.Execute()
}
157 changes: 104 additions & 53 deletions redshiftsink/pkg/consumer/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package consumer

import (
"bytes"
"context"
"fmt"
"path/filepath"
"sync"
Expand Down Expand Up @@ -95,6 +96,10 @@ type batchProcessor struct {
// batchEndOffset is the ending offset of the batch
// this is useful only for logging and debugging purpose
batchEndOffset int64

// lastCommitedOffset tells the last commitedOffset
// this is helpful to log at the time of shutdown and can help in debugging
lastCommittedOffset int64
}

func newBatchProcessor(
Expand All @@ -121,6 +126,26 @@ func newBatchProcessor(
}
}

func (b *batchProcessor) setBatchId() {
if b.batchId == maxBatchId {
klog.V(5).Infof("topic:%s: Resetting batchId to zero.")
b.batchId = 0
}

b.batchId += 1
}

func (b *batchProcessor) setS3key(topic string, partition int32, offset int64) {
b.s3Key = filepath.Join(
b.s3BucketDir,
topic,
fmt.Sprintf(
"%d_offset_%d_partition.csv",
offset,
partition),
)
}

func (b *batchProcessor) commitOffset(datas []interface{}) {
for _, data := range datas {
// TODO: this should not be required, fix the gobatch code
Expand All @@ -135,94 +160,120 @@ func (b *batchProcessor) commitOffset(datas []interface{}) {
// need to worry about this. Since the write to s3 again will
// just overwrite the same data.
b.session.Commit()
klog.V(5).Infof(
"topic=%s, message=%s: Processed\n",
// 99 is an exception
klog.V(99).Infof(
"topic:%s, message:%s: Processed\n",
message.Topic, string(message.Value),
)
b.lastCommittedOffset = message.Offset
}
}

func (b *batchProcessor) shutdownInfo() {
klog.Infof(
"topic:%s: Batch processing gracefully shutdown.\n",
b.topic,
)
if b.lastCommittedOffset == 0 {
klog.Infof("topic:%s: Nothing new was committed.\n", b.topic)
} else {
klog.Infof(
"topic:%s: lastCommittedOffset: %d.\n",
b.topic,
b.lastCommittedOffset,
)
}
}

func (b *batchProcessor) signalLoad(bucket string, key string) {

}

func (b *batchProcessor) transform(data *sarama.ConsumerMessage) []byte {
return []byte{}
}
func (b *batchProcessor) processMessage(
message *sarama.ConsumerMessage, id int) {

func (b *batchProcessor) setS3key(topic string, partition int32, offset int64) {
b.s3Key = filepath.Join(
b.s3BucketDir,
topic,
fmt.Sprintf(
"%d_offset_%d_partition.csv",
offset,
partition),
klog.V(5).Infof(
"topic:%s, batchId:%d id:%d: transforming\n",
b.topic, b.batchId, id,
)
// V(99) is an expception but required
klog.V(99).Infof(
"topic:%s, batchId:%d id:%d: message:%+v\n",
b.topic, b.batchId, id, message,
)
}

func (b *batchProcessor) newtransformedBuffer(datas []interface{}) {
b.s3Key = ""
// key is always made based on the first not nil message in the batch
if b.s3Key == "" {
b.setS3key(message.Topic, message.Partition, message.Offset)

for id, data := range datas {
// TODO: this should not be required, fix the gobatch code
if data == nil {
continue
}
message := data.(*sarama.ConsumerMessage)
klog.V(5).Infof(
"topic=%s, batchId=%d id=%d: transforming\n",
b.topic, b.batchId, id)
klog.V(5).Infof(
"topic=%s, batchId=%d id=%d: message=%+v\n",
b.topic, b.batchId, id, message)

// key is always made based on the first not nil message in the batch
if b.s3Key == "" {
b.setS3key(message.Topic, message.Partition, message.Offset)
klog.V(5).Infof("topic=%s, batchId=%d id=%d: s3Key=%s\n",
b.topic, b.batchId, id, b.s3Key)
b.batchStartOffset = message.Offset
}
klog.V(5).Infof("topic:%s, batchId:%d id:%d: s3Key:%s\n",
b.topic, b.batchId, id, b.s3Key,
)

b.bodyBuf.Write(message.Value)
b.bodyBuf.Write([]byte{'\n'})
klog.V(5).Infof(
"topic=%s, batchId=%d id=%d: transformed\n",
b.topic, b.batchId, id)
b.batchEndOffset = message.Offset
b.batchStartOffset = message.Offset
}

b.bodyBuf.Write(message.Value)
b.bodyBuf.Write([]byte{'\n'})

klog.V(5).Infof(
"topic:%s, batchId:%d id:%d: transformed\n",
b.topic, b.batchId, id,
)

b.batchEndOffset = message.Offset
}

func (b *batchProcessor) setBatchId() {
if b.batchId == maxBatchId {
klog.V(5).Infof("topic=%s: Resetting batchId to zero.")
b.batchId = 0
// processBatch handles the batch procesing and return true if all completes
// otherwise return false in case of gracefull shutdown signals being captured,
// this helps in cleanly shutting down the batch processing.
func (b *batchProcessor) processBatch(
ctx context.Context, datas []interface{}) bool {

b.s3Key = ""
for id, data := range datas {
select {
case <-ctx.Done():
return false
default:
// TODO: fix gobatch for this
if data == nil {
continue
}
message := data.(*sarama.ConsumerMessage)
b.processMessage(message, id)
}
}

b.batchId += 1
return true
}

func (b *batchProcessor) process(workerID int, datas []interface{}) {
b.setBatchId()
klog.Infof("topic=%s, batchId=%d, size=%d: Processing...\n",
b.topic, b.batchId, len(datas))

klog.Infof("topic:%s, batchId:%d, size:%d: Processing...\n",
b.topic, b.batchId, len(datas),
)

// TODO: transform the debezium event into redshift commands
// and create a new buffer
b.newtransformedBuffer(datas)
done := b.processBatch(b.session.Context(), datas)
if !done {
b.shutdownInfo()
return
}

err := b.s3sink.Upload(b.s3Key, b.bodyBuf)
if err != nil {
klog.Fatalf("Error writing to s3, err=%v\n", err)
}

klog.V(4).Infof(
"topic=%s, batchId=%d, startOffset: %d, endOffset: %d: Uploaded",
"topic:%s, batchId:%d, startOffset:%d, endOffset:%d: Uploaded",
b.topic, b.batchId, b.batchStartOffset, b.batchEndOffset,
)
klog.V(5).Infof(
"topic=%s, batchId=%d: Uploaded at: %s",
"topic:%s, batchId:%d: Uploaded: %s",
b.topic, b.batchId, b.s3Key,
)

Expand All @@ -231,7 +282,7 @@ func (b *batchProcessor) process(workerID int, datas []interface{}) {
b.commitOffset(datas)

klog.Infof(
"topic=%s, batchId=%d, startOffset: %d, endOffset: %d: Processed",
"topic:%s, batchId:%d, startOffset:%d, endOffset:%d: Processed",
b.topic, b.batchId, b.batchStartOffset, b.batchEndOffset,
)
}
42 changes: 32 additions & 10 deletions redshiftsink/pkg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (c consumer) processMessage(
// load the batcher for the topic
ub, _ := c.batchers.Load(message.Topic)
if ub == nil {
klog.Fatalf("Error loading batcher for topic=%s\n", message.Topic)
klog.Fatalf("Error loading batcher for topic:%s\n", message.Topic)
}
b := ub.(*batcher)

Expand All @@ -62,7 +62,13 @@ func (c consumer) processMessage(
)
}

b.mbatch.Insert(message)
select {
case <-b.processor.session.Context().Done():
klog.Info("Graceful shutdown requested, not inserting in batch")
return nil
default:
b.mbatch.Insert(message)
}

return nil
}
Expand All @@ -72,21 +78,37 @@ func (c consumer) ConsumeClaim(
session sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error {

klog.V(4).Infof("Starting to consume messages... Claims: %+v\n", session.Claims())
klog.V(2).Infof(
"ConsumeClaim for topic:%s, partition:%d, initalOffset:%d\n",
claim.Topic(),
claim.Partition(),
claim.InitialOffset(),
)
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
err := c.processMessage(session, message)
if err != nil {
klog.Errorf("Error processing: value=%s, timestamp=%v, topic=%s",
string(message.Value), message.Timestamp, message.Topic)
continue
select {
case <-session.Context().Done():
klog.Infof(
"Gracefully shutdown. Stopped taking new messages.")
return nil
default:
err := c.processMessage(session, message)
if err != nil {
klog.Errorf(
"Error processing: value:%s, timestamp:%v, topic:%s",
string(message.Value), message.Timestamp, message.Topic)
continue
}
}
}

klog.V(4).Info("Shutting down ConsumerClaim.")

klog.V(4).Infof(
"ConsumeClaim shut down for topic: %s, partition: %d\n",
claim.Topic(),
claim.Partition(),
)
return nil
}
Loading

0 comments on commit 8ad6288

Please sign in to comment.