Skip to content

Commit

Permalink
config safety timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
moogacs committed Sep 16, 2022
1 parent d4b89e7 commit ef2a189
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,12 @@ type ReaderConfig struct {
// Default: 10s
MaxWait time.Duration

// MaxSafetyTimout amount of time to wait for new data to come when fetching message
// forom a batch
//
// Default: 10s
MaxSafetyTimout time.Duration

// ReadLagInterval sets the frequency at which the reader lag is updated.
// Setting this field to a negative value disables lag reporting.
ReadLagInterval time.Duration
Expand Down Expand Up @@ -649,6 +655,10 @@ func NewReader(config ReaderConfig) *Reader {
config.MaxWait = 10 * time.Second
}

if config.MaxSafetyTimout == 0 {
config.MaxSafetyTimout = 10 * time.Second
}

if config.ReadLagInterval == 0 {
config.ReadLagInterval = 1 * time.Minute
}
Expand Down Expand Up @@ -1190,6 +1200,7 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
minBytes: r.config.MinBytes,
maxBytes: r.config.MaxBytes,
maxWait: r.config.MaxWait,
maxSafetyTimout: r.config.MaxSafetyTimout,
backoffDelayMin: r.config.ReadBackoffMin,
backoffDelayMax: r.config.ReadBackoffMax,
version: r.version,
Expand Down Expand Up @@ -1218,6 +1229,7 @@ type reader struct {
minBytes int
maxBytes int
maxWait time.Duration
maxSafetyTimout time.Duration
backoffDelayMin time.Duration
backoffDelayMax time.Duration
version int64
Expand Down Expand Up @@ -1492,15 +1504,10 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err
var size int64
var bytes int64

const safetyTimeout = 10 * time.Second
deadline := time.Now().Add(safetyTimeout)
conn.SetReadDeadline(deadline)
conn.SetReadDeadline(time.Now().Add(r.maxSafetyTimout))

for {
if now := time.Now(); deadline.Sub(now) < (safetyTimeout / 2) {
deadline = now.Add(safetyTimeout)
conn.SetReadDeadline(deadline)
}
conn.SetReadDeadline(time.Now().Add(r.maxSafetyTimout))

if msg, err = batch.ReadMessage(); err != nil {
batch.Close()
Expand Down

0 comments on commit ef2a189

Please sign in to comment.