Skip to content

Commit

Permalink
sink: refine close in kafka sink (#586)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored May 20, 2020
1 parent 557a397 commit 13ccbe6
Showing 1 changed file with 19 additions and 4 deletions.
23 changes: 19 additions & 4 deletions cdc/sink/mqProducer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,12 @@ func (k *kafkaSaramaProducer) SyncBroadcastMessage(ctx context.Context, key []by
Partition: int32(i),
}
}
return errors.Trace(k.syncClient.SendMessages(msgs))
select {
case <-k.closeCh:
return nil
default:
return errors.Trace(k.syncClient.SendMessages(msgs))
}
}

func (k *kafkaSaramaProducer) Flush(ctx context.Context) error {
Expand Down Expand Up @@ -134,7 +139,12 @@ func (k *kafkaSaramaProducer) GetPartitionNum() int32 {
}

func (k *kafkaSaramaProducer) Close() error {
close(k.closeCh)
select {
case <-k.closeCh:
return nil
default:
close(k.closeCh)
}
err1 := k.syncClient.Close()
err2 := k.asyncClient.Close()
if err1 != nil {
Expand All @@ -147,7 +157,13 @@ func (k *kafkaSaramaProducer) Close() error {
}

func (k *kafkaSaramaProducer) run(ctx context.Context) error {
defer k.flushedReceiver.Stop()
defer func() {
k.flushedReceiver.Stop()
err := k.Close()
if err != nil {
log.Error("close kafkaSaramaProducer with error", zap.Error(err))
}
}()
for {
select {
case <-ctx.Done():
Expand All @@ -162,7 +178,6 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {
atomic.StoreUint64(&k.partitionOffset[msg.Partition].flushed, flushedOffset)
k.flushedNotifier.Notify()
case err := <-k.asyncClient.Errors():
close(k.closeCh)
return errors.Annotate(err, "write kafka error")
}
}
Expand Down

0 comments on commit 13ccbe6

Please sign in to comment.