Skip to content

Commit

Permalink
services/horizon: Update cursor when bumping ledger to latest (#4150)
Browse files Browse the repository at this point in the history
Update cursor in Stellar-Core to latest ingested ledger when local ingestion is
behind the cluster.

If there is a DB ingesting instance more than one ingesting instance: either
Captive-Core or DB connected to another Stellar-Core, it's possible that the
cursor will be updated very rarely. This will make the Stellar-Core DB size
bloat as the old ledger will not be removed.
  • Loading branch information
bartekn authored Jan 10, 2022
1 parent ca65479 commit d33088d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 0 deletions.
8 changes: 8 additions & 0 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,14 @@ func (r resumeState) run(s *system) (transition, error) {
log.WithField("ingestLedger", ingestLedger).
WithField("lastIngestedLedger", lastIngestedLedger).
Info("bumping ingest ledger to next ledger after ingested ledger in db")

// Update cursor if there's more than one ingesting instance: either
// Captive-Core or DB ingestion connected to another Stellar-Core.
if err = s.updateCursor(lastIngestedLedger); err != nil {
// Don't return updateCursor error.
log.WithError(err).Warn("error updating stellar-core cursor")
}

return retryResume(resumeState{
latestSuccessfullyProcessedLedger: lastIngestedLedger,
}), nil
Expand Down
7 changes: 7 additions & 0 deletions services/horizon/internal/ingest/resume_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,13 @@ func (s *ResumeTestTestSuite) TestBumpIngestLedger() {
s.historyQ.On("Begin").Return(nil).Once()
s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(101), nil).Once()

s.stellarCoreClient.On(
"SetCursor",
mock.AnythingOfType("*context.timerCtx"),
defaultCoreCursorName,
int32(101),
).Return(errors.New("my error")).Once()

next, err := resumeState{latestSuccessfullyProcessedLedger: 99}.run(s.system)
s.Assert().NoError(err)
s.Assert().Equal(
Expand Down

0 comments on commit d33088d

Please sign in to comment.