Skip to content

Commit

Permalink
Retry infinitely on expired shard iterator error
Browse files Browse the repository at this point in the history
  • Loading branch information
adatzer authored and colmsnowplow committed Nov 13, 2024
1 parent 9b8b911 commit 8cc8e5a
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions shard_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,22 +202,25 @@ mainloop:
if awsErr.OrigErr() != nil {
origErrStr = fmt.Sprintf("(%s) ", awsErr.OrigErr())
}
k.config.logger.Log("Got error: %s %s %sretry count is %d / %d", awsErr.Code(), awsErr.Message(), origErrStr, retryCount, maxErrorRetries)
// Only retry for errors that should be retried; notably, don't retry serialization errors because something bad is happening
shouldRetry := request.IsErrorRetryable(err) || request.IsErrorThrottle(err)

switch awsErr.Code() {
case kinesis.ErrCodeExpiredIteratorException:
k.config.logger.Log("Got error: %s %s %s", awsErr.Code(), awsErr.Message(), origErrStr)
newIterator, ierr := getShardIterator(k.kinesis, k.streamName, shardID, lastSeqToCheckp, nil)
if ierr != nil {
k.shardErrors <- shardConsumerError{shardID: shardID, action: "getShardIterator", err: err}
return
}
iterator = newIterator
// should retry after expired iterator is renewed successfully
shouldRetry = true

// retry infinitely after expired iterator is renewed successfully
continue mainloop
}

// Only retry for errors that should be retried; notably, don't retry serialization errors because something bad is happening
shouldRetry := request.IsErrorRetryable(err) || request.IsErrorThrottle(err)
k.config.logger.Log("Got error: %s %s %sretry count is %d / %d", awsErr.Code(), awsErr.Message(), origErrStr, retryCount, maxErrorRetries)

if shouldRetry && retryCount < maxErrorRetries {
retryCount++

Expand Down

0 comments on commit 8cc8e5a

Please sign in to comment.