diff --git a/services/kafka/service.go b/services/kafka/service.go index 2a04bff2a..928d73f15 100644 --- a/services/kafka/service.go +++ b/services/kafka/service.go @@ -16,7 +16,7 @@ import ( "github.com/influxdata/kapacitor/keyvalue" "github.com/influxdata/kapacitor/server/vars" "github.com/pkg/errors" - kafka "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go" ) const ( @@ -51,7 +51,11 @@ type writer struct { wg sync.WaitGroup statsKey string - ticker *time.Ticker + + // time.Ticker doesn't expose any way to close its internal channel, + // so we have to use a side-channel to signal when it's been stopped. + ticker *time.Ticker + tickerDone chan struct{} } func (w *writer) Open() { @@ -71,6 +75,7 @@ func (w *writer) Open() { statsMap.Set(statWriteMessageCount, writeMessages) w.ticker = time.NewTicker(time.Second) + w.tickerDone = make(chan struct{}, 1) w.wg.Add(1) go func() { defer w.wg.Done() @@ -80,6 +85,7 @@ func (w *writer) Open() { func (w *writer) Close() { w.ticker.Stop() + close(w.tickerDone) vars.DeleteStatistic(w.statsKey) w.kafka.Close() w.wg.Wait() @@ -89,10 +95,15 @@ func (w *writer) Close() { // A read operation on the kafka.Writer.Stats() method causes the internal counters to be reset. // As a result we control all reads through this method. func (w *writer) pollStats() { - for range w.ticker.C { - stats := w.kafka.Stats() - atomic.AddInt64(&w.messageCount, stats.Messages) - atomic.AddInt64(&w.errorCount, stats.Errors) + for { + select { + case <-w.tickerDone: + return + case <-w.ticker.C: + stats := w.kafka.Stats() + atomic.AddInt64(&w.messageCount, stats.Messages) + atomic.AddInt64(&w.errorCount, stats.Errors) + } } } diff --git a/services/kafka/service_test.go b/services/kafka/service_test.go new file mode 100644 index 000000000..a3d059d46 --- /dev/null +++ b/services/kafka/service_test.go @@ -0,0 +1,34 @@ +package kafka_test + +import ( + "os" + "testing" + + "github.com/influxdata/kapacitor/services/diagnostic" + "github.com/influxdata/kapacitor/services/kafka" + "github.com/influxdata/kapacitor/services/kafka/kafkatest" + "github.com/stretchr/testify/require" +) + +func TestWriter_UpdateConfig(t *testing.T) { + ts, err := kafkatest.NewServer() + require.NoError(t, err) + defer ts.Close() + + c := kafka.NewConfig() + c.Enabled = true + c.Brokers = []string{ts.Addr.String()} + + cluster := kafka.NewCluster(c) + defer cluster.Close() + diag := diagnostic.NewService(diagnostic.NewConfig(), os.Stderr, os.Stdin) + require.NoError(t, diag.Open()) + defer diag.Close() + + // Write a message to generate a writer. + require.NoError(t, cluster.WriteMessage(diag.NewKafkaHandler(), "testTopic", []byte{1, 2, 3, 4}, []byte{1, 2, 3, 4})) + + // Update the config in a way that requires shutting down all active writers. + c.UseSSL = true + require.NoError(t, cluster.Update(c)) +}