Skip to content

Commit

Permalink
Replace session context with main context
Browse files Browse the repository at this point in the history
Had to go around this problem using mainContext as sessions get closed quite often when the total topics is huge.
Using context in struct instead of golang suggested approach of passing context around. Since sarama does not support passing context around.

IBM/sarama#1776
golang/go#22602
  • Loading branch information
alok87 committed Feb 18, 2021
1 parent f07eff9 commit f6755a3
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 5 deletions.
1 change: 1 addition & 0 deletions cmd/redshiftloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func run(cmd *cobra.Command, args []string) {
groupConfig,
redshiftloader.NewConsumer(
ready,
ctx,
groupConfig.Sarama,
redshifter,
),
Expand Down
11 changes: 7 additions & 4 deletions pkg/redshiftloader/consumer.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package redshiftloader

import (
"context"
"github.com/Shopify/sarama"
"github.com/practo/klog/v2"
"github.com/practo/tipoca-stream/redshiftsink/pkg/kafka"
"github.com/practo/tipoca-stream/redshiftsink/pkg/redshift"
)

func NewConsumer(ready chan bool, saramaConfig kafka.SaramaConfig, redshifter *redshift.Redshift) consumer {
func NewConsumer(ready chan bool, mainContext context.Context, saramaConfig kafka.SaramaConfig, redshifter *redshift.Redshift) consumer {
return consumer{
ready: ready,
mainContext: mainContext,
saramaConfig: saramaConfig,
redshifter: redshifter,

Expand All @@ -23,6 +25,7 @@ func NewConsumer(ready chan bool, saramaConfig kafka.SaramaConfig, redshifter *r
type consumer struct {
// Ready is used to signal the main thread about the readiness
ready chan bool
mainContext context.Context
loader *loader
saramaConfig kafka.SaramaConfig
redshifter *redshift.Redshift
Expand Down Expand Up @@ -57,14 +60,14 @@ func (c consumer) processMessage(
if c.loader.processor == nil {
c.loader.processor = newLoadProcessor(
message.Topic, message.Partition,
session, c.saramaConfig, c.redshifter,
session, c.mainContext, c.saramaConfig, c.redshifter,
)
}
// TODO: not sure added below for safety, it may not be required
c.loader.processor.session = session

select {
case <-c.loader.processor.session.Context().Done():
case <-c.mainContext.Done():
klog.Info("Graceful shutdown requested, not inserting in batch")
return nil
default:
Expand Down Expand Up @@ -93,7 +96,7 @@ func (c consumer) ConsumeClaim(session sarama.ConsumerGroupSession,
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
select {
case <-session.Context().Done():
case <-c.mainContext.Done():
klog.Infof(
"%s: Gracefully shutdown. Stopped taking new messages.",
claim.Topic(),
Expand Down
6 changes: 5 additions & 1 deletion pkg/redshiftloader/load_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type loadProcessor struct {
// session is required to commit the offsets on succesfull processing
session sarama.ConsumerGroupSession

mainContext context.Context

// s3Sink
s3sink *s3sink.S3Sink

Expand Down Expand Up @@ -82,6 +84,7 @@ type loadProcessor struct {

func newLoadProcessor(
topic string, partition int32, session sarama.ConsumerGroupSession,
mainContext context.Context,
saramaConfig kafka.SaramaConfig,
redshifter *redshift.Redshift) *loadProcessor {

Expand All @@ -102,6 +105,7 @@ func newLoadProcessor(
partition: partition,
autoCommit: saramaConfig.AutoCommit,
session: session,
mainContext: mainContext,
s3sink: sink,
messageTransformer: debezium.NewMessageTransformer(),
schemaTransformer: debezium.NewSchemaTransformer(
Expand All @@ -118,7 +122,7 @@ func newLoadProcessor(
// TODO: get rid of this https://github.com/herryg91/gobatch/issues/2
func (b *loadProcessor) ctxCancelled() bool {
select {
case <-b.session.Context().Done():
case <-b.mainContext.Done():
klog.Infof(
"topic:%s, batchId:%d, lastCommittedOffset:%d: Cancelled.\n",
b.topic, b.batchId, b.lastCommittedOffset,
Expand Down

0 comments on commit f6755a3

Please sign in to comment.