Skip to content

Commit

Permalink
[remove] test
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Oct 10, 2024
1 parent e5a7e27 commit 76008da
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions shard_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ mainloop:
continue mainloop
}

// sleep for 5 minutes + 5 sec, to make sure the shard iterator expires
fmt.Println("Sleeping for 305 seconds")
fmt.Printf("pre-sleep time.Now: %s\n", time.Now().Format(time.RFC3339))
time.Sleep(305 * time.Second)
fmt.Printf("post-sleep time.Now: %s\n", time.Now().Format(time.RFC3339))

// Get records from kinesis
records, next, lag, err := getRecords(k.kinesis, iterator)

Expand All @@ -208,12 +214,14 @@ mainloop:

switch awsErr.Code() {
case kinesis.ErrCodeExpiredIteratorException:
fmt.Printf("Renewing expired shard [id: %s] iterator\n", shardID)
newIterator, ierr := getShardIterator(k.kinesis, k.streamName, shardID, lastSeqToCheckp, nil)
if ierr != nil {
k.shardErrors <- shardConsumerError{shardID: shardID, action: "getShardIterator", err: err}
return
}
iterator = newIterator
fmt.Printf("Renewed expired shard [id: %s] iterator successfully\n", shardID)
// should retry after expired iterator is renewed successfully
shouldRetry = true
}
Expand Down

0 comments on commit 76008da

Please sign in to comment.