diff --git a/services/horizon/internal/expingest/parallel.go b/services/horizon/internal/expingest/parallel.go index 7630addcd6..620d98a77d 100644 --- a/services/horizon/internal/expingest/parallel.go +++ b/services/horizon/internal/expingest/parallel.go @@ -99,7 +99,8 @@ func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeS wait sync.WaitGroup stop = make(chan struct{}) batchSize = calculateParallelLedgerBatchSize(toLedger-fromLedger, batchSizeSuggestion, ps.workerCount) - totalJobs = uint32(math.Ceil(float64(toLedger-fromLedger) / float64(batchSize))) + // we add one because both toLedger and fromLedger are included in the rabge + totalJobs = uint32(math.Ceil(float64(toLedger-fromLedger+1) / float64(batchSize))) ) wait.Add(1) @@ -113,7 +114,7 @@ func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeS defer wait.Done() for subRangeFrom := fromLedger; subRangeFrom < toLedger; { // job queuing - subRangeTo := subRangeFrom + batchSize + subRangeTo := subRangeFrom + (batchSize - 1) // we subtract one because both from and to are part of the batch if subRangeTo > toLedger { subRangeTo = toLedger } @@ -122,7 +123,7 @@ func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeS return case ps.reingestJobQueue <- ledgerRange{subRangeFrom, subRangeTo}: } - subRangeFrom = subRangeTo + subRangeFrom = subRangeTo + 1 } }() diff --git a/services/horizon/internal/expingest/parallel_test.go b/services/horizon/internal/expingest/parallel_test.go index ad6165b821..676b6b0ce1 100644 --- a/services/horizon/internal/expingest/parallel_test.go +++ b/services/horizon/internal/expingest/parallel_test.go @@ -63,8 +63,8 @@ func TestParallelReingestRange(t *testing.T) { sort.Sort(rangesCalled) expected := sorteableRanges{ - {from: 0, to: 256}, {from: 256, to: 512}, {from: 512, to: 768}, {from: 768, to: 1024}, {from: 1024, to: 1280}, - {from: 1280, to: 1536}, {from: 1536, to: 1792}, {from: 1792, to: 2048}, {from: 2048, to: 2050}, + {from: 0, to: 255}, {from: 256, to: 511}, {from: 512, to: 767}, {from: 768, to: 1023}, {from: 1024, to: 1279}, + {from: 1280, to: 1535}, {from: 1536, to: 1791}, {from: 1792, to: 2047}, {from: 2048, to: 2050}, } assert.Equal(t, expected, rangesCalled) system.Shutdown() @@ -81,7 +81,7 @@ func TestParallelReingestRangeError(t *testing.T) { factory := func(c Config) (System, error) { result := &mockSystem{} // Fail on the second range - result.On("ReingestRange", uint32(1536), uint32(1792), mock.AnythingOfType("bool")).Return(errors.New("failed because of foo")) + result.On("ReingestRange", uint32(1536), uint32(1791), mock.AnythingOfType("bool")).Return(errors.New("failed because of foo")) result.On("ReingestRange", mock.AnythingOfType("uint32"), mock.AnythingOfType("uint32"), mock.AnythingOfType("bool")).Return(error(nil)) result.On("Shutdown").Run(func(mock.Arguments) { m.Lock() @@ -94,7 +94,7 @@ func TestParallelReingestRangeError(t *testing.T) { assert.NoError(t, err) err = system.ReingestRange(0, 2050, 258) assert.Error(t, err) - assert.Equal(t, "in subrange 1536 to 1792: failed because of foo", err.Error()) + assert.Equal(t, "in subrange 1536 to 1791: failed because of foo", err.Error()) system.Shutdown() assert.Equal(t, 3, shutdowns)