diff --git a/reader.go b/reader.go index 08f4f5b04..1eea3d1b6 100644 --- a/reader.go +++ b/reader.go @@ -400,6 +400,9 @@ type ReaderConfig struct { // Default: 10s MaxWait time.Duration + // MaxSafetyTimeout + MaxSafetyTimeout time.Duration + // ReadLagInterval sets the frequency at which the reader lag is updated. // Setting this field to a negative value disables lag reporting. ReadLagInterval time.Duration @@ -1203,22 +1206,23 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) { defer join.Done() (&reader{ - dialer: r.config.Dialer, - logger: r.config.Logger, - errorLogger: r.config.ErrorLogger, - brokers: r.config.Brokers, - topic: key.topic, - partition: int(key.partition), - minBytes: r.config.MinBytes, - maxBytes: r.config.MaxBytes, - maxWait: r.config.MaxWait, - backoffDelayMin: r.config.ReadBackoffMin, - backoffDelayMax: r.config.ReadBackoffMax, - version: r.version, - msgs: r.msgs, - stats: r.stats, - isolationLevel: r.config.IsolationLevel, - maxAttempts: r.config.MaxAttempts, + dialer: r.config.Dialer, + logger: r.config.Logger, + errorLogger: r.config.ErrorLogger, + brokers: r.config.Brokers, + topic: key.topic, + partition: int(key.partition), + minBytes: r.config.MinBytes, + maxBytes: r.config.MaxBytes, + maxWait: r.config.MaxWait, + maxSafetyTimeout: r.config.MaxSafetyTimeout, + backoffDelayMin: r.config.ReadBackoffMin, + backoffDelayMax: r.config.ReadBackoffMax, + version: r.version, + msgs: r.msgs, + stats: r.stats, + isolationLevel: r.config.IsolationLevel, + maxAttempts: r.config.MaxAttempts, // backwards-compatibility flags offsetOutOfRangeError: r.config.OffsetOutOfRangeError, @@ -1231,22 +1235,23 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) { // used as an way to asynchronously fetch messages while the main program reads // them using the high level reader API. type reader struct { - dialer *Dialer - logger Logger - errorLogger Logger - brokers []string - topic string - partition int - minBytes int - maxBytes int - maxWait time.Duration - backoffDelayMin time.Duration - backoffDelayMax time.Duration - version int64 - msgs chan<- readerMessage - stats *readerStats - isolationLevel IsolationLevel - maxAttempts int + dialer *Dialer + logger Logger + errorLogger Logger + brokers []string + topic string + partition int + minBytes int + maxBytes int + maxWait time.Duration + maxSafetyTimeout time.Duration + backoffDelayMin time.Duration + backoffDelayMax time.Duration + version int64 + msgs chan<- readerMessage + stats *readerStats + isolationLevel IsolationLevel + maxAttempts int offsetOutOfRangeError bool } @@ -1514,7 +1519,15 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err var size int64 var bytes int64 + deadline := time.Now().Add(r.maxSafetyTimeout) + conn.SetReadDeadline(deadline) + for { + if now := time.Now(); deadline.Sub(now) < (r.maxSafetyTimeout / 2) { + deadline = now.Add(r.maxSafetyTimeout) + conn.SetReadDeadline(deadline) + } + if msg, err = batch.ReadMessage(); err != nil { batch.Close() break