Skip to content

Commit

Permalink
add configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
moogacs committed Sep 14, 2022
1 parent 99ec20f commit 216d5b1
Showing 1 changed file with 45 additions and 32 deletions.
77 changes: 45 additions & 32 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,9 @@ type ReaderConfig struct {
// Default: 10s
MaxWait time.Duration

// MaxSafetyTimeout
MaxSafetyTimeout time.Duration

// ReadLagInterval sets the frequency at which the reader lag is updated.
// Setting this field to a negative value disables lag reporting.
ReadLagInterval time.Duration
Expand Down Expand Up @@ -1203,22 +1206,23 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
defer join.Done()

(&reader{
dialer: r.config.Dialer,
logger: r.config.Logger,
errorLogger: r.config.ErrorLogger,
brokers: r.config.Brokers,
topic: key.topic,
partition: int(key.partition),
minBytes: r.config.MinBytes,
maxBytes: r.config.MaxBytes,
maxWait: r.config.MaxWait,
backoffDelayMin: r.config.ReadBackoffMin,
backoffDelayMax: r.config.ReadBackoffMax,
version: r.version,
msgs: r.msgs,
stats: r.stats,
isolationLevel: r.config.IsolationLevel,
maxAttempts: r.config.MaxAttempts,
dialer: r.config.Dialer,
logger: r.config.Logger,
errorLogger: r.config.ErrorLogger,
brokers: r.config.Brokers,
topic: key.topic,
partition: int(key.partition),
minBytes: r.config.MinBytes,
maxBytes: r.config.MaxBytes,
maxWait: r.config.MaxWait,
maxSafetyTimeout: r.config.MaxSafetyTimeout,
backoffDelayMin: r.config.ReadBackoffMin,
backoffDelayMax: r.config.ReadBackoffMax,
version: r.version,
msgs: r.msgs,
stats: r.stats,
isolationLevel: r.config.IsolationLevel,
maxAttempts: r.config.MaxAttempts,

// backwards-compatibility flags
offsetOutOfRangeError: r.config.OffsetOutOfRangeError,
Expand All @@ -1231,22 +1235,23 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
// used as an way to asynchronously fetch messages while the main program reads
// them using the high level reader API.
type reader struct {
dialer *Dialer
logger Logger
errorLogger Logger
brokers []string
topic string
partition int
minBytes int
maxBytes int
maxWait time.Duration
backoffDelayMin time.Duration
backoffDelayMax time.Duration
version int64
msgs chan<- readerMessage
stats *readerStats
isolationLevel IsolationLevel
maxAttempts int
dialer *Dialer
logger Logger
errorLogger Logger
brokers []string
topic string
partition int
minBytes int
maxBytes int
maxWait time.Duration
maxSafetyTimeout time.Duration
backoffDelayMin time.Duration
backoffDelayMax time.Duration
version int64
msgs chan<- readerMessage
stats *readerStats
isolationLevel IsolationLevel
maxAttempts int

offsetOutOfRangeError bool
}
Expand Down Expand Up @@ -1514,7 +1519,15 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err
var size int64
var bytes int64

deadline := time.Now().Add(r.maxSafetyTimeout)
conn.SetReadDeadline(deadline)

for {
if now := time.Now(); deadline.Sub(now) < (r.maxSafetyTimeout / 2) {
deadline = now.Add(r.maxSafetyTimeout)
conn.SetReadDeadline(deadline)
}

if msg, err = batch.ReadMessage(); err != nil {
batch.Close()
break
Expand Down

0 comments on commit 216d5b1

Please sign in to comment.