Skip to content

Commit

Permalink
Forgot to fix the off-by-one issue
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio committed Jun 24, 2020
1 parent c22b092 commit 64f44a6
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
7 changes: 4 additions & 3 deletions services/horizon/internal/expingest/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeS
wait sync.WaitGroup
stop = make(chan struct{})
batchSize = calculateParallelLedgerBatchSize(toLedger-fromLedger, batchSizeSuggestion, ps.workerCount)
totalJobs = uint32(math.Ceil(float64(toLedger-fromLedger) / float64(batchSize)))
// we add one because both toLedger and fromLedger are included in the rabge
totalJobs = uint32(math.Ceil(float64(toLedger-fromLedger+1) / float64(batchSize)))
)

wait.Add(1)
Expand All @@ -113,7 +114,7 @@ func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeS
defer wait.Done()
for subRangeFrom := fromLedger; subRangeFrom < toLedger; {
// job queuing
subRangeTo := subRangeFrom + batchSize
subRangeTo := subRangeFrom + (batchSize - 1) // we subtract one because both from and to are part of the batch
if subRangeTo > toLedger {
subRangeTo = toLedger
}
Expand All @@ -122,7 +123,7 @@ func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeS
return
case ps.reingestJobQueue <- ledgerRange{subRangeFrom, subRangeTo}:
}
subRangeFrom = subRangeTo
subRangeFrom = subRangeTo + 1
}
}()

Expand Down
8 changes: 4 additions & 4 deletions services/horizon/internal/expingest/parallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func TestParallelReingestRange(t *testing.T) {

sort.Sort(rangesCalled)
expected := sorteableRanges{
{from: 0, to: 256}, {from: 256, to: 512}, {from: 512, to: 768}, {from: 768, to: 1024}, {from: 1024, to: 1280},
{from: 1280, to: 1536}, {from: 1536, to: 1792}, {from: 1792, to: 2048}, {from: 2048, to: 2050},
{from: 0, to: 255}, {from: 256, to: 511}, {from: 512, to: 767}, {from: 768, to: 1023}, {from: 1024, to: 1279},
{from: 1280, to: 1535}, {from: 1536, to: 1791}, {from: 1792, to: 2047}, {from: 2048, to: 2050},
}
assert.Equal(t, expected, rangesCalled)
system.Shutdown()
Expand All @@ -81,7 +81,7 @@ func TestParallelReingestRangeError(t *testing.T) {
factory := func(c Config) (System, error) {
result := &mockSystem{}
// Fail on the second range
result.On("ReingestRange", uint32(1536), uint32(1792), mock.AnythingOfType("bool")).Return(errors.New("failed because of foo"))
result.On("ReingestRange", uint32(1536), uint32(1791), mock.AnythingOfType("bool")).Return(errors.New("failed because of foo"))
result.On("ReingestRange", mock.AnythingOfType("uint32"), mock.AnythingOfType("uint32"), mock.AnythingOfType("bool")).Return(error(nil))
result.On("Shutdown").Run(func(mock.Arguments) {
m.Lock()
Expand All @@ -94,7 +94,7 @@ func TestParallelReingestRangeError(t *testing.T) {
assert.NoError(t, err)
err = system.ReingestRange(0, 2050, 258)
assert.Error(t, err)
assert.Equal(t, "in subrange 1536 to 1792: failed because of foo", err.Error())
assert.Equal(t, "in subrange 1536 to 1791: failed because of foo", err.Error())

system.Shutdown()
assert.Equal(t, 3, shutdowns)
Expand Down

0 comments on commit 64f44a6

Please sign in to comment.