Skip to content

Commit

Permalink
add config for no timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
moogacs committed Sep 14, 2022
1 parent 99ec20f commit ce461a4
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ type ReaderConfig struct {

// Maximum amount of time to wait for new data to come when fetching batches
// of messages from kafka.
// if value is -1 means will not timeout waiting for new msgs to come.
//
// Default: 10s
MaxWait time.Duration
Expand Down Expand Up @@ -1497,7 +1498,12 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err
r.stats.offset.observe(offset)

t0 := time.Now()
conn.SetReadDeadline(t0.Add(r.maxWait))

if r.maxWait == -1 {
conn.SetReadDeadline(time.Time{})
} else {
conn.SetReadDeadline(t0.Add(r.maxWait))
}

batch := conn.ReadBatchWith(ReadBatchConfig{
MinBytes: r.minBytes,
Expand Down

0 comments on commit ce461a4

Please sign in to comment.