From f6755a32fea2bfdfa2b38d75e4391ad714d7d54c Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Thu, 18 Feb 2021 22:18:11 +0530 Subject: [PATCH] Replace session context with main context Had to go around this problem using mainContext as sessions get closed quite often when the total topics is huge. Using context in struct instead of golang suggested approach of passing context around. Since sarama does not support passing context around. Shopify/sarama#1776 golang/go#22602 --- cmd/redshiftloader/main.go | 1 + pkg/redshiftloader/consumer.go | 11 +++++++---- pkg/redshiftloader/load_processor.go | 6 +++++- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/cmd/redshiftloader/main.go b/cmd/redshiftloader/main.go index dae7ece65..18606edf7 100644 --- a/cmd/redshiftloader/main.go +++ b/cmd/redshiftloader/main.go @@ -89,6 +89,7 @@ func run(cmd *cobra.Command, args []string) { groupConfig, redshiftloader.NewConsumer( ready, + ctx, groupConfig.Sarama, redshifter, ), diff --git a/pkg/redshiftloader/consumer.go b/pkg/redshiftloader/consumer.go index 2468a8217..82faa6a8c 100644 --- a/pkg/redshiftloader/consumer.go +++ b/pkg/redshiftloader/consumer.go @@ -1,15 +1,17 @@ package redshiftloader import ( + "context" "github.com/Shopify/sarama" "github.com/practo/klog/v2" "github.com/practo/tipoca-stream/redshiftsink/pkg/kafka" "github.com/practo/tipoca-stream/redshiftsink/pkg/redshift" ) -func NewConsumer(ready chan bool, saramaConfig kafka.SaramaConfig, redshifter *redshift.Redshift) consumer { +func NewConsumer(ready chan bool, mainContext context.Context, saramaConfig kafka.SaramaConfig, redshifter *redshift.Redshift) consumer { return consumer{ ready: ready, + mainContext: mainContext, saramaConfig: saramaConfig, redshifter: redshifter, @@ -23,6 +25,7 @@ func NewConsumer(ready chan bool, saramaConfig kafka.SaramaConfig, redshifter *r type consumer struct { // Ready is used to signal the main thread about the readiness ready chan bool + mainContext context.Context loader *loader saramaConfig kafka.SaramaConfig redshifter *redshift.Redshift @@ -57,14 +60,14 @@ func (c consumer) processMessage( if c.loader.processor == nil { c.loader.processor = newLoadProcessor( message.Topic, message.Partition, - session, c.saramaConfig, c.redshifter, + session, c.mainContext, c.saramaConfig, c.redshifter, ) } // TODO: not sure added below for safety, it may not be required c.loader.processor.session = session select { - case <-c.loader.processor.session.Context().Done(): + case <-c.mainContext.Done(): klog.Info("Graceful shutdown requested, not inserting in batch") return nil default: @@ -93,7 +96,7 @@ func (c consumer) ConsumeClaim(session sarama.ConsumerGroupSession, // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 for message := range claim.Messages() { select { - case <-session.Context().Done(): + case <-c.mainContext.Done(): klog.Infof( "%s: Gracefully shutdown. Stopped taking new messages.", claim.Topic(), diff --git a/pkg/redshiftloader/load_processor.go b/pkg/redshiftloader/load_processor.go index 4b3e42eee..0705c4191 100644 --- a/pkg/redshiftloader/load_processor.go +++ b/pkg/redshiftloader/load_processor.go @@ -28,6 +28,8 @@ type loadProcessor struct { // session is required to commit the offsets on succesfull processing session sarama.ConsumerGroupSession + mainContext context.Context + // s3Sink s3sink *s3sink.S3Sink @@ -82,6 +84,7 @@ type loadProcessor struct { func newLoadProcessor( topic string, partition int32, session sarama.ConsumerGroupSession, + mainContext context.Context, saramaConfig kafka.SaramaConfig, redshifter *redshift.Redshift) *loadProcessor { @@ -102,6 +105,7 @@ func newLoadProcessor( partition: partition, autoCommit: saramaConfig.AutoCommit, session: session, + mainContext: mainContext, s3sink: sink, messageTransformer: debezium.NewMessageTransformer(), schemaTransformer: debezium.NewSchemaTransformer( @@ -118,7 +122,7 @@ func newLoadProcessor( // TODO: get rid of this https://github.com/herryg91/gobatch/issues/2 func (b *loadProcessor) ctxCancelled() bool { select { - case <-b.session.Context().Done(): + case <-b.mainContext.Done(): klog.Infof( "topic:%s, batchId:%d, lastCommittedOffset:%d: Cancelled.\n", b.topic, b.batchId, b.lastCommittedOffset,