Skip to content

Commit

Permalink
Remove unused method
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Jan 13, 2022
1 parent 023d356 commit 5170f8b
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ public class KinesisShardCheckpointer
private String logicalProcessName;
private int currentIterationNumber;
private KinesisClientLease kinesisClientLease;
private long checkpointIntervalMillis;
private long nextCheckpointTimeMillis;

public KinesisShardCheckpointer(
AmazonDynamoDB dynamoDBClient,
Expand Down Expand Up @@ -65,7 +63,6 @@ public KinesisShardCheckpointer(
this.kinesisSplit = kinesisSplit;
this.logicalProcessName = logicalProcessName;
this.currentIterationNumber = currentIterationNumber;
this.checkpointIntervalMillis = checkpointIntervalMS;

try {
this.leaseManager.createLeaseTableIfNotExists(dynamoReadCapacity, dynamoWriteCapacity);
Expand All @@ -82,12 +79,6 @@ public KinesisShardCheckpointer(
catch (ProvisionedThroughputException | InvalidStateException | DependencyException e) {
throw new RuntimeException(e);
}
resetNextCheckpointTime();
}

private void resetNextCheckpointTime()
{
nextCheckpointTimeMillis = System.nanoTime() + checkpointIntervalMillis * 1_000_000;
}

private String createCheckpointKey(int iterationNo)
Expand Down Expand Up @@ -117,7 +108,6 @@ public void checkpoint(String lastReadSequenceNumber)
catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
throw new RuntimeException(e);
}
resetNextCheckpointTime();
}

//return checkpoint of previous iteration if found
Expand Down Expand Up @@ -145,11 +135,4 @@ public String getLastReadSeqNumber()
}
return lastReadSeqNumber;
}

public void checkpointIfTimeUp(String lastReadSeqNo)
{
if (System.nanoTime() >= nextCheckpointTimeMillis) {
checkpoint(lastReadSeqNo);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,6 @@ public List<InternalShard> getShardsFrom(String afterShardId)
}
}

public void activate()
{
this.streamStatus = "ACTIVE";
}

public PutRecordResult putRecord(ByteBuffer data, String partitionKey)
{
// Create record and insert into the shards. Initially just do it
Expand All @@ -203,13 +198,6 @@ record = record.withData(data).withPartitionKey(partitionKey).withSequenceNumber

return result;
}

public void clearRecords()
{
for (InternalShard shard : this.shards) {
shard.clearRecords();
}
}
}

public static class ShardIterator
Expand Down

0 comments on commit 5170f8b

Please sign in to comment.