Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

service/horizon: Add protocol version support check #3093

Merged
merged 1 commit into from
Oct 5, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion services/horizon/internal/expingest/db_integration_test.go
Original file line number Diff line number Diff line change
@@ -124,7 +124,7 @@ func (s *DBTestSuite) setupMocksForBuildState() {
},
},
nil,
).Once()
).Twice()
}

func (s *DBTestSuite) TearDownTest() {
4 changes: 4 additions & 0 deletions services/horizon/internal/expingest/main.go
Original file line number Diff line number Diff line change
@@ -22,6 +22,10 @@ import (
)

const (
// MaxSupportedProtocolVersion defines the maximum supported version of
// the Stellar protocol.
MaxSupportedProtocolVersion = 14

// CurrentVersion reflects the latest version of the ingestion
// algorithm. This value is stored in KV store and is used to decide
// if there's a need to reprocess the ledger state or reingest data.
38 changes: 38 additions & 0 deletions services/horizon/internal/expingest/processor_runner.go
Original file line number Diff line number Diff line change
@@ -129,6 +129,32 @@ func (s *ProcessorRunner) buildTransactionProcessor(
}
}

// checkIfProtocolVersionSupported checks if this Horizon version supports the
// protocol version of a ledger with the given sequence number.
func (s *ProcessorRunner) checkIfProtocolVersionSupported(ledgerSequence uint32) error {
exists, ledgerCloseMeta, err := s.ledgerBackend.GetLedger(ledgerSequence)
if err != nil {
return errors.Wrap(err, "Error getting ledger")
}

if !exists {
return errors.New("cannot check if protocol version supported: ledger does not exist")
}

ledgerVersion := ledgerCloseMeta.V0.LedgerHeader.Header.LedgerVersion

if ledgerVersion > MaxSupportedProtocolVersion {
return fmt.Errorf(
"This Horizon version does not support protocol version %d. "+
"The latest supported protocol version is %d. Please upgrade to the latest Horizon version.",
ledgerVersion,
MaxSupportedProtocolVersion,
)
}

return nil
}

// validateBucketList validates if the bucket list hash in history archive
// matches the one in corresponding ledger header in stellar-core backend.
// This gives you full security if data in stellar-core backend can be trusted
@@ -179,6 +205,10 @@ func (s *ProcessorRunner) RunHistoryArchiveIngestion(checkpointLedger uint32) (i
NetworkPassphrase: s.config.NetworkPassphrase,
}
} else {
if err = s.checkIfProtocolVersionSupported(checkpointLedger); err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Protocol version not supported")
}

if err = s.validateBucketList(checkpointLedger); err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error validating bucket list from HAS")
}
@@ -248,6 +278,10 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger uint32) (io.St
return ledgerTransactionStats.GetResults(), errors.Wrap(err, "Error creating ledger reader")
}

if err = s.checkIfProtocolVersionSupported(ledger); err != nil {
return ledgerTransactionStats.GetResults(), errors.Wrap(err, "Protocol version not supported")
}

txProcessor := s.buildTransactionProcessor(&ledgerTransactionStats, transactionReader.GetHeader())
err = io.StreamLedgerTransactions(txProcessor, transactionReader)
if err != nil {
@@ -266,6 +300,10 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(sequence uint32) (io.StatsCha
changeStats := io.StatsChangeProcessor{}
var statsLedgerTransactionProcessorResults io.StatsLedgerTransactionProcessorResults

if err := s.checkIfProtocolVersionSupported(sequence); err != nil {
return changeStats.GetResults(), statsLedgerTransactionProcessorResults, errors.Wrap(err, "Protocol version not supported")
}

err := s.runChangeProcessorOnLedger(
s.buildChangeProcessor(&changeStats, ledgerSource, sequence),
sequence,
144 changes: 142 additions & 2 deletions services/horizon/internal/expingest/processor_runner_test.go
Original file line number Diff line number Diff line change
@@ -125,7 +125,7 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) {
},
},
nil,
).Once()
).Twice() // 2nd time for protocol version check

// Batches
mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
@@ -187,6 +187,71 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) {
assert.NoError(t, err)
}

func TestProcessorRunnerRunHistoryArchiveIngestionProtocolVersionNotSupported(t *testing.T) {
maxBatchSize := 100000

config := Config{
NetworkPassphrase: network.PublicNetworkPassphrase,
}

q := &mockDBQ{}
defer mock.AssertExpectationsForObjects(t, q)
historyAdapter := &adapters.MockHistoryArchiveAdapter{}
defer mock.AssertExpectationsForObjects(t, historyAdapter)
ledgerBackend := &ledgerbackend.MockDatabaseBackend{}
defer mock.AssertExpectationsForObjects(t, ledgerBackend)

ledgerBackend.On("GetLedger", uint32(100)).
Return(
true,
xdr.LedgerCloseMeta{
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerVersion: 200,
},
},
},
},
nil,
).Once()

// Batches
mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockOffersBatchInsertBuilder)
q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize).
Return(mockOffersBatchInsertBuilder).Once()

mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockAccountDataBatchInsertBuilder)
q.MockQData.On("NewAccountDataBatchInsertBuilder", maxBatchSize).
Return(mockAccountDataBatchInsertBuilder).Once()

mockClaimableBalancesBatchInsertBuilder := &history.MockClaimableBalancesBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockClaimableBalancesBatchInsertBuilder)
q.MockQClaimableBalances.On("NewClaimableBalancesBatchInsertBuilder", maxBatchSize).
Return(mockClaimableBalancesBatchInsertBuilder).Once()

mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockAccountSignersBatchInsertBuilder)
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize).
Return(mockAccountSignersBatchInsertBuilder).Once()

q.MockQAssetStats.On("InsertAssetStats", []history.ExpAssetStat{}, 100000).
Return(nil)

runner := ProcessorRunner{
ctx: context.Background(),
config: config,
historyQ: q,
historyAdapter: historyAdapter,
ledgerBackend: ledgerBackend,
}

_, err := runner.RunHistoryArchiveIngestion(100)
assert.EqualError(t, err, "Protocol version not supported: This Horizon version does not support protocol version 200. The latest supported protocol version is 14. Please upgrade to the latest Horizon version.")
}

func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
maxBatchSize := 100000

@@ -289,6 +354,10 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) {
},
}

// Method called 4 times:
// - Protocol version check,
// - Changes reader,
// - Transactions reader (includes protocol check again because it's a public method).
ledgerBackend.On("GetLedger", uint32(63)).
Return(
true,
@@ -298,7 +367,7 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) {
},
},
nil,
).Twice() // Twice = changes then transactions
).Times(4)

// Batches
mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
@@ -349,3 +418,74 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) {
_, _, err := runner.RunAllProcessorsOnLedger(63)
assert.NoError(t, err)
}

func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *testing.T) {
maxBatchSize := 100000

config := Config{
NetworkPassphrase: network.PublicNetworkPassphrase,
}

q := &mockDBQ{}
defer mock.AssertExpectationsForObjects(t, q)
ledgerBackend := &ledgerbackend.MockDatabaseBackend{}
defer mock.AssertExpectationsForObjects(t, ledgerBackend)

ledger := xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerVersion: 200,
},
}

ledgerBackend.On("GetLedger", uint32(63)).
Return(
true,
xdr.LedgerCloseMeta{
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: ledger,
},
},
nil,
).Once()

// Batches
mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockOffersBatchInsertBuilder)
q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize).
Return(mockOffersBatchInsertBuilder).Once()

mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockAccountDataBatchInsertBuilder)
q.MockQData.On("NewAccountDataBatchInsertBuilder", maxBatchSize).
Return(mockAccountDataBatchInsertBuilder).Once()

mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockAccountSignersBatchInsertBuilder)
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize).
Return(mockAccountSignersBatchInsertBuilder).Once()

mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockOperationsBatchInsertBuilder)
q.MockQOperations.On("NewOperationBatchInsertBuilder", maxBatchSize).
Return(mockOperationsBatchInsertBuilder).Twice()

mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder)
q.MockQTransactions.On("NewTransactionBatchInsertBuilder", maxBatchSize).
Return(mockTransactionsBatchInsertBuilder).Twice()

mockClaimableBalancesBatchInsertBuilder := &history.MockClaimableBalancesBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockClaimableBalancesBatchInsertBuilder)
q.MockQClaimableBalances.On("NewClaimableBalancesBatchInsertBuilder", maxBatchSize).
Return(mockClaimableBalancesBatchInsertBuilder).Once()

runner := ProcessorRunner{
ctx: context.Background(),
config: config,
historyQ: q,
ledgerBackend: ledgerBackend,
}

_, _, err := runner.RunAllProcessorsOnLedger(63)
assert.EqualError(t, err, "Protocol version not supported: This Horizon version does not support protocol version 200. The latest supported protocol version is 14. Please upgrade to the latest Horizon version.")
}