Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Oct 10, 2024
1 parent c77820e commit e5a7e27
Showing 1 changed file with 16 additions and 45 deletions.
61 changes: 16 additions & 45 deletions shard_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,40 +56,15 @@ func getShardIterator(k kinesisiface.KinesisAPI, streamName string, shardID stri
}

// getRecords returns the next records and shard iterator from the given shard iterator
func (k *Kinsumer) getRecords(kApi kinesisiface.KinesisAPI, streamName string, shardID string, iterator string) (records []*kinesis.Record, nextIterator string, lag time.Duration, err error) {
func getRecords(k kinesisiface.KinesisAPI, iterator string) (records []*kinesis.Record, nextIterator string, lag time.Duration, err error) {
params := &kinesis.GetRecordsInput{
Limit: aws.Int64(getRecordsLimit),
ShardIterator: aws.String(iterator),
}

// sleep for 5 minutes + 5 sec, to make sure the shard iterator expires
fmt.Println("Sleeping for 305 seconds")
fmt.Printf("pre-sleep time.Now: %s\n", time.Now().Format(time.RFC3339))
time.Sleep(305 * time.Second)
fmt.Printf("post-sleep time.Now: %s\n", time.Now().Format(time.RFC3339))

output, err := kApi.GetRecords(params)
output, err := k.GetRecords(params)

if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case kinesis.ErrCodeExpiredIteratorException:
fmt.Printf("Renewing expired shard [id: %s] iterator\n", shardID)

seqNum, err := k.getLastSequenceNumber(shardID)
if err != nil {
return nil, "", 0, err
}

it, ierr := getShardIterator(kApi, streamName, shardID, seqNum, nil)
if ierr != nil {
fmt.Printf("Failed to renew shard [id: %s] iterator\n", shardID)
return nil, "", 0, ierr
}
fmt.Printf("Renewed expired shard [id: %s] iterator successfully\n", shardID)
return k.getRecords(kApi, streamName, shardID, it)
}
}
return nil, "", 0, err
}

Expand Down Expand Up @@ -131,23 +106,6 @@ func (k *Kinsumer) captureShard(shardID string) (*checkpointer, error) {
}
}

func (k *Kinsumer) getLastSequenceNumber(shardID string) (string, error) {
// capture the checkpointer
checkpointer, err := k.captureShard(shardID)
if err != nil {
k.shardErrors <- shardConsumerError{shardID: shardID, action: "captureShard", err: err}
return "", err
}

// if we failed to capture the checkpointer but there was no errors
// we must have stopped, so don't process this shard at all
if checkpointer == nil {
return "", nil
}

return checkpointer.sequenceNumber, nil
}

// consume is a blocking call that captures then consumes the given shard in a loop.
// It is also responsible for writing out the checkpoint updates to dynamo.
// TODO: There are no tests for this file. Not sure how to even unit test this.
Expand Down Expand Up @@ -236,7 +194,7 @@ mainloop:
}

// Get records from kinesis
records, next, lag, err := k.getRecords(k.kinesis, k.streamName, shardID, iterator)
records, next, lag, err := getRecords(k.kinesis, iterator)

if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
Expand All @@ -247,6 +205,19 @@ mainloop:
k.config.logger.Log("Got error: %s %s %sretry count is %d / %d", awsErr.Code(), awsErr.Message(), origErrStr, retryCount, maxErrorRetries)
// Only retry for errors that should be retried; notably, don't retry serialization errors because something bad is happening
shouldRetry := request.IsErrorRetryable(err) || request.IsErrorThrottle(err)

switch awsErr.Code() {
case kinesis.ErrCodeExpiredIteratorException:
newIterator, ierr := getShardIterator(k.kinesis, k.streamName, shardID, lastSeqToCheckp, nil)
if ierr != nil {
k.shardErrors <- shardConsumerError{shardID: shardID, action: "getShardIterator", err: err}
return
}
iterator = newIterator
// should retry after expired iterator is renewed successfully
shouldRetry = true
}

if shouldRetry && retryCount < maxErrorRetries {
retryCount++

Expand Down

0 comments on commit e5a7e27

Please sign in to comment.