Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Use encoding buffer in state verifier #4069

Merged
merged 1 commit into from
Nov 13, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions services/horizon/internal/ingest/verify.go
Original file line number Diff line number Diff line change
@@ -152,9 +152,7 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error {
}
defer stateReader.Close()

verifier := &verify.StateVerifier{
StateReader: stateReader,
}
verifier := verify.NewStateVerifier(stateReader, nil)

assetStats := processors.AssetStatSet{}
total := int64(0)
44 changes: 27 additions & 17 deletions services/horizon/internal/ingest/verify/main.go
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ type TransformLedgerEntryFunction func(xdr.LedgerEntry) (ignore bool, newEntry x
// StateVerifier verifies if ledger entries provided by Add method are the same
// as in the checkpoint ledger entries provided by CheckpointChangeReader.
// The algorithm works in the following way:
// 0. Develop TransformFunction. It should remove all fields and objects not
// 0. Develop `transformFunction`. It should remove all fields and objects not
// stored in your app. For example, if you only store accounts, all other
// ledger entry types should be ignored (return ignore = true).
// 1. In a loop, get entries from history archive by calling GetEntries()
@@ -32,19 +32,28 @@ type TransformLedgerEntryFunction func(xdr.LedgerEntry) (ignore bool, newEntry x
// entries in your storage (to find if some extra entires exist in your
// storage).
// Functions will return StateError type if state is found to be incorrect.
// It's user responsibility to call `StateReader.Close()` when reading is done.
// It's user responsibility to call `stateReader.Close()` when reading is done.
// Check Horizon for an example how to use this tool.
type StateVerifier struct {
StateReader ingest.ChangeReader
// TransformFunction transforms (or ignores) ledger entries streamed from
stateReader ingest.ChangeReader
// transformFunction transforms (or ignores) ledger entries streamed from
// checkpoint buckets to match the form added by `Write`. Read
// TransformLedgerEntryFunction godoc for more information.
TransformFunction TransformLedgerEntryFunction
transformFunction TransformLedgerEntryFunction

readEntries int
readingDone bool

currentEntries map[string]xdr.LedgerEntry
encodingBuffer *xdr.EncodingBuffer
}

func NewStateVerifier(stateReader ingest.ChangeReader, tf TransformLedgerEntryFunction) *StateVerifier {
return &StateVerifier{
stateReader: stateReader,
transformFunction: tf,
encodingBuffer: xdr.NewEncodingBuffer(),
}
}

// GetLedgerKeys returns up to `count` ledger keys from history buckets
@@ -59,7 +68,7 @@ func (v *StateVerifier) GetLedgerKeys(count int) ([]xdr.LedgerKey, error) {
v.currentEntries = make(map[string]xdr.LedgerEntry)

for count > 0 {
entryChange, err := v.StateReader.Read()
entryChange, err := v.stateReader.Read()
if err != nil {
if err == io.EOF {
v.readingDone = true
@@ -70,15 +79,15 @@ func (v *StateVerifier) GetLedgerKeys(count int) ([]xdr.LedgerKey, error) {

entry := *entryChange.Post

if v.TransformFunction != nil {
ignore, _ := v.TransformFunction(entry)
if v.transformFunction != nil {
ignore, _ := v.transformFunction(entry)
if ignore {
continue
}
}

ledgerKey := entry.LedgerKey()
key, err := ledgerKey.MarshalBinary()
key, err := v.encodingBuffer.MarshalBinary(ledgerKey)
if err != nil {
return keys, errors.Wrap(err, "Error marshaling ledgerKey")
}
@@ -101,12 +110,13 @@ func (v *StateVerifier) GetLedgerKeys(count int) ([]xdr.LedgerKey, error) {
// Any `StateError` returned by this method indicates invalid state!
func (v *StateVerifier) Write(entry xdr.LedgerEntry) error {
actualEntry := entry.Normalize()
actualEntryMarshaled, err := actualEntry.MarshalBinary()
actualEntryMarshaled, err := v.encodingBuffer.MarshalBinary(actualEntry)
if err != nil {
return errors.Wrap(err, "Error marshaling actualEntry")
}

key, err := actualEntry.LedgerKey().MarshalBinary()
// safe, since we convert to string right away (causing a copy)
key, err := v.encodingBuffer.UnsafeMarshalBinary(actualEntry.LedgerKey())
if err != nil {
return errors.Wrap(err, "Error marshaling ledgerKey")
}
@@ -122,25 +132,25 @@ func (v *StateVerifier) Write(entry xdr.LedgerEntry) error {
delete(v.currentEntries, string(key))

preTransformExpectedEntry := expectedEntry
preTransformExpectedEntryMarshaled, err := preTransformExpectedEntry.MarshalBinary()
preTransformExpectedEntryMarshaled, err := v.encodingBuffer.MarshalBinary(&preTransformExpectedEntry)
if err != nil {
return errors.Wrap(err, "Error marshaling preTransformExpectedEntry")
}

if v.TransformFunction != nil {
if v.transformFunction != nil {
var ignore bool
ignore, expectedEntry = v.TransformFunction(expectedEntry)
ignore, expectedEntry = v.transformFunction(expectedEntry)
// Extra check: if entry was ignored in GetEntries, it shouldn't be
// ignored here.
if ignore {
return errors.Errorf(
"Entry ignored in GetEntries but not ignored in Write: %s. Possibly TransformFunction is buggy.",
"Entry ignored in GetEntries but not ignored in Write: %s. Possibly transformFunction is buggy.",
base64.StdEncoding.EncodeToString(preTransformExpectedEntryMarshaled),
)
}
}

expectedEntryMarshaled, err := expectedEntry.MarshalBinary()
expectedEntryMarshaled, err := v.encodingBuffer.MarshalBinary(&expectedEntry)
if err != nil {
return errors.Wrap(err, "Error marshaling expectedEntry")
}
@@ -195,7 +205,7 @@ func (v *StateVerifier) checkUnreadEntries() error {
}

// Ignore error as StateError below is more important
entryString, _ := xdr.MarshalBase64(entry)
entryString, _ := v.encodingBuffer.MarshalBase64(&entry)
return ingest.NewStateError(errors.Errorf(
"Entries (%d) not found locally, example: %s",
len(v.currentEntries),
14 changes: 6 additions & 8 deletions services/horizon/internal/ingest/verify/main_test.go
Original file line number Diff line number Diff line change
@@ -36,9 +36,7 @@ type StateVerifierTestSuite struct {

func (s *StateVerifierTestSuite) SetupTest() {
s.mockStateReader = &ingest.MockChangeReader{}
s.verifier = &StateVerifier{
StateReader: s.mockStateReader,
}
s.verifier = NewStateVerifier(s.mockStateReader, nil)
}

func (s *StateVerifierTestSuite) TearDownTest() {
@@ -103,7 +101,7 @@ func (s *StateVerifierTestSuite) TestTransformFunction() {

s.mockStateReader.On("Read").Return(ingest.Change{}, io.EOF).Once()

s.verifier.TransformFunction =
s.verifier.transformFunction =
func(entry xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) {
// Leave Account ID only for accounts, ignore the rest
switch entry.Data.Type {
@@ -190,7 +188,7 @@ func (s *StateVerifierTestSuite) TestTransformFunctionBuggyIgnore() {
Post: &accountEntry,
}, nil).Once()

s.verifier.TransformFunction =
s.verifier.transformFunction =
func(entry xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) {
return false, xdr.LedgerEntry{}
}
@@ -199,16 +197,16 @@ func (s *StateVerifierTestSuite) TestTransformFunctionBuggyIgnore() {
s.Assert().NoError(err)
s.Assert().Len(keys, 1)

// Check the behaviour of TransformFunction to code path to test.
s.verifier.TransformFunction =
// Check the behaviour of transformFunction to code path to test.
s.verifier.transformFunction =
func(entry xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) {
return true, xdr.LedgerEntry{}
}

entryBase64, err := xdr.MarshalBase64(accountEntry)
s.Assert().NoError(err)
errorMsg := fmt.Sprintf(
"Entry ignored in GetEntries but not ignored in Write: %s. Possibly TransformFunction is buggy.",
"Entry ignored in GetEntries but not ignored in Write: %s. Possibly transformFunction is buggy.",
entryBase64,
)
err = s.verifier.Write(accountEntry)
10 changes: 10 additions & 0 deletions xdr/main.go
Original file line number Diff line number Diff line change
@@ -167,6 +167,16 @@ func (e *EncodingBuffer) UnsafeMarshalHex(encodable XDREncodable) ([]byte, error
return e.otherEncodersBuf, nil
}

func (e *EncodingBuffer) MarshalBinary(encodable XDREncodable) ([]byte, error) {
xdrEncoded, err := e.UnsafeMarshalBinary(encodable)
if err != nil {
return nil, err
}
ret := make([]byte, len(xdrEncoded))
copy(ret, xdrEncoded)
return ret, nil
}

func (e *EncodingBuffer) MarshalBase64(encodable XDREncodable) (string, error) {
b, err := e.UnsafeMarshalBase64(encodable)
if err != nil {