diff --git a/config.go b/config.go index ec2cb2b60..736d2de9f 100644 --- a/config.go +++ b/config.go @@ -197,22 +197,20 @@ type Config struct { // (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms. MaxProcessingTime time.Duration - // The time interval between ticks of the fast checker. A value of 0 - // turns off the fast checker. - // If this is set to a non-zero value, then there will be periodic - // checks to see if messages have been written to the Messages channel. - // If a message has not been written to the Messages channel since the - // last tick of the fast checker, then the timer will be set. + // Whether or not to use the fast checker. The fast checker uses a + // ticker instead of a timer to implement the timeout functionality in + // (*partitionConsumer).responseFeeder. + // If a message is not written to the Messages channel between two ticks + // of the fast checker then a timeout is detected. // Using the fast checker should typically result in many fewer calls to // Timer functions resulting in a significant performance improvement if // many messages are being sent and timeouts are infrequent. // The disadvantage of using the fast checker is that timeouts will be // less accurate. That is, the effective timeout could be between - // `MaxProcessingTime` and `MaxProcessingTime + FastCheckerInterval`. - // For example, if `MaxProcessingTime` is 100ms and - // `FastCheckerInterval` is 10ms, then a delay of 108ms between two + // `MaxProcessingTime` and `2 * MaxProcessingTime`. For example, if + // `MaxProcessingTime` is 100ms then a delay of 180ms between two // messages being sent may not be recognized as a timeout. - FastCheckerInterval time.Duration + UseFastChecker bool // Return specifies what channels will be populated. If they are set to true, // you must read from them to prevent deadlock. @@ -294,7 +292,7 @@ func NewConfig() *Config { c.Consumer.Retry.Backoff = 2 * time.Second c.Consumer.MaxWaitTime = 250 * time.Millisecond c.Consumer.MaxProcessingTime = 100 * time.Millisecond - c.Consumer.FastCheckerInterval = 0 + c.Consumer.UseFastChecker = false c.Consumer.Return.Errors = false c.Consumer.Offsets.CommitInterval = 1 * time.Second c.Consumer.Offsets.Initial = OffsetNewest @@ -420,8 +418,6 @@ func (c *Config) Validate() error { return ConfigurationError("Consumer.MaxWaitTime must be >= 1ms") case c.Consumer.MaxProcessingTime <= 0: return ConfigurationError("Consumer.MaxProcessingTime must be > 0") - case c.Consumer.FastCheckerInterval < 0: - return ConfigurationError("Consumer.FastCheckerInterval must be >= 0") case c.Consumer.Retry.Backoff < 0: return ConfigurationError("Consumer.Retry.Backoff must be >= 0") case c.Consumer.Offsets.CommitInterval <= 0: diff --git a/consumer.go b/consumer.go index 8631ee6b8..2ce69b00b 100644 --- a/consumer.go +++ b/consumer.go @@ -441,67 +441,36 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 { func (child *partitionConsumer) responseFeeder() { var msgs []*ConsumerMessage msgSent := false - // Initialize timer without a pending send on its channel - expiryTimer := time.NewTimer(0) - <-expiryTimer.C - expiryTimerSet := false - - var fastCheckerChan <-chan (time.Time) - if child.conf.Consumer.FastCheckerInterval > 0 { - fastChecker := time.NewTicker(child.conf.Consumer.FastCheckerInterval) - defer fastChecker.Stop() - fastCheckerChan = fastChecker.C - } feederLoop: for response := range child.feeder { msgs, child.responseResult = child.parseResponse(response) + expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime) for i, msg := range msgs { - if child.conf.Consumer.FastCheckerInterval <= 0 { - expiryTimerSet = true - expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime) - } - messageSelect: select { case child.messages <- msg: msgSent = true - if expiryTimerSet { - // The timer was set and a message was sent, stop the - // timer and resume using the fast checker - if !expiryTimer.Stop() { - <-expiryTimer.C + case <-expiryTicker.C: + if !msgSent { + child.responseResult = errTimedOut + child.broker.acks.Done() + for _, msg = range msgs[i:] { + child.messages <- msg } - expiryTimerSet = false - } - // Periodically check if messages have been sent - case <-fastCheckerChan: - if msgSent { + child.broker.input <- child + continue feederLoop + } else { + // current message has not been sent, return to select + // statement msgSent = false - } else if !expiryTimerSet { - // No messages have been sent since the last tick, - // start the timer - expiryTimerSet = true - // If the fast checker is being used, then at least - // the time between two fast checker ticks has already - // passed since the last message was sent. - expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime - child.conf.Consumer.FastCheckerInterval) - } - // message has not been sent, return to select statement - goto messageSelect - case <-expiryTimer.C: - expiryTimerSet = false - child.responseResult = errTimedOut - child.broker.acks.Done() - for _, msg = range msgs[i:] { - child.messages <- msg + goto messageSelect } - child.broker.input <- child - continue feederLoop } } + expiryTicker.Stop() child.broker.acks.Done() } diff --git a/consumer_test.go b/consumer_test.go index b4c7e060f..546bd2f6b 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -822,7 +822,7 @@ func TestConsumerFastCheckerOff(t *testing.T) { config := NewConfig() config.ChannelBufferSize = 0 - config.Consumer.FastCheckerInterval = 0 + config.Consumer.UseFastChecker = false config.Consumer.MaxProcessingTime = 10 * time.Millisecond master, err := NewConsumer([]string{broker0.Addr()}, config) if err != nil { @@ -865,7 +865,7 @@ func TestConsumerFastCheckerOn(t *testing.T) { config := NewConfig() config.ChannelBufferSize = 0 - config.Consumer.FastCheckerInterval = 1 * time.Millisecond + config.Consumer.UseFastChecker = true config.Consumer.MaxProcessingTime = 10 * time.Millisecond master, err := NewConsumer([]string{broker0.Addr()}, config) if err != nil {