Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrating new loaders and builders into processors #5083

Merged
merged 14 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion services/horizon/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3'
services:
horizon-postgres:
image: postgres:9.6.17-alpine
image: postgres:postgres:12-bullseye
restart: on-failure
environment:
- POSTGRES_HOST_AUTH_METHOD=trust
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/actions/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func checkOuterHashResponse(
}

func TestFeeBumpTransactionPage(t *testing.T) {

tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
Expand Down
18 changes: 13 additions & 5 deletions services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const loaderLookupBatchSize = 50000

// Value implements the database/sql/driver Valuer interface.
func (a FutureAccountID) Value() (driver.Value, error) {
return a.loader.GetNow(a.address), nil
return a.loader.GetNow(a.address)
}

// AccountLoader will map account addresses to their history
Expand Down Expand Up @@ -71,11 +71,15 @@ func (a *AccountLoader) GetFuture(address string) FutureAccountID {
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *AccountLoader) GetNow(address string) int64 {
if id, ok := a.ids[address]; !ok {
panic(fmt.Errorf("address %v not present", address))
func (a *AccountLoader) GetNow(address string) (int64, error) {
if !a.sealed {
return 0, fmt.Errorf(`invalid account loader state,
Exec was not called yet to properly seal and resolve %v id`, address)
}
if internalID, ok := a.ids[address]; !ok {
return 0, fmt.Errorf(`account loader address %q was not found`, address)
} else {
return id
return internalID, nil
}
}

Expand Down Expand Up @@ -207,3 +211,7 @@ func NewAccountLoaderStub() AccountLoaderStub {
func (a AccountLoaderStub) Insert(address string, id int64) {
a.Loader.ids[address] = id
}

func (a AccountLoaderStub) Sealed() {
sreuland marked this conversation as resolved.
Show resolved Hide resolved
a.Loader.sealed = true
}
24 changes: 10 additions & 14 deletions services/horizon/internal/db2/history/account_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,11 @@
}

loader := NewAccountLoader()
var futures []FutureAccountID
for _, address := range addresses {
future := loader.GetFuture(address)
futures = append(futures, future)
assert.Panics(t, func() {
loader.GetNow(address)
})
assert.Panics(t, func() {
future.Value()
})
_, err := future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid account loader state,`)
duplicateFuture := loader.GetFuture(address)
assert.Equal(t, future, duplicateFuture)
}
Expand All @@ -42,15 +37,16 @@
})

q := &Q{session}
for i, address := range addresses {
future := futures[i]
id := loader.GetNow(address)
val, err := future.Value()
for _, address := range addresses {
internalId, err := loader.GetNow(address)

Check failure on line 41 in services/horizon/internal/db2/history/account_loader_test.go

View workflow job for this annotation

GitHub Actions / golangci

ST1003: var internalId should be internalID (stylecheck)
assert.NoError(t, err)
assert.Equal(t, id, val)
var account Account
assert.NoError(t, q.AccountByAddress(context.Background(), &account, address))
assert.Equal(t, account.ID, id)
assert.Equal(t, account.ID, internalId)
assert.Equal(t, account.Address, address)
}

_, err := loader.GetNow("not present")
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
44 changes: 24 additions & 20 deletions services/horizon/internal/db2/history/asset_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql/driver"
"fmt"
"sort"
"strings"

sq "github.com/Masterminds/squirrel"

Expand All @@ -21,11 +22,15 @@ type AssetKey struct {
Issuer string
}

func (key AssetKey) String() string {
return key.Type + "/" + key.Code + "/" + key.Issuer
sreuland marked this conversation as resolved.
Show resolved Hide resolved
}

// AssetKeyFromXDR constructs an AssetKey from an xdr asset
func AssetKeyFromXDR(asset xdr.Asset) AssetKey {
return AssetKey{
Type: xdr.AssetTypeToString[asset.Type],
Code: asset.GetCode(),
Code: strings.TrimRight(asset.GetCode(), "\x00"),
Issuer: asset.GetIssuer(),
}
}
Expand All @@ -41,7 +46,7 @@ type FutureAssetID struct {

// Value implements the database/sql/driver Valuer interface.
func (a FutureAssetID) Value() (driver.Value, error) {
return a.loader.GetNow(a.asset), nil
return a.loader.GetNow(a.asset)
}

// AssetLoader will map assets to their history
Expand Down Expand Up @@ -81,11 +86,15 @@ func (a *AssetLoader) GetFuture(asset AssetKey) FutureAssetID {
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *AssetLoader) GetNow(asset AssetKey) int64 {
if id, ok := a.ids[asset]; !ok {
panic(fmt.Errorf("asset %v not present", asset))
func (a *AssetLoader) GetNow(asset AssetKey) (int64, error) {
if !a.sealed {
return 0, fmt.Errorf(`invalid asset loader state,
Exec was not called yet to properly seal and resolve %v id`, asset)
}
if internalID, ok := a.ids[asset]; !ok {
return 0, fmt.Errorf(`asset loader id %v was not found`, asset)
} else {
return id
return internalID, nil
}
}

Expand Down Expand Up @@ -137,6 +146,11 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err
assetTypes := make([]string, 0, len(a.set)-len(a.ids))
assetCodes := make([]string, 0, len(a.set)-len(a.ids))
assetIssuers := make([]string, 0, len(a.set)-len(a.ids))
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Slice(keys, func(i, j int) bool {
return keys[i].String() < keys[j].String()
})
insert := 0
for _, key := range keys {
if _, ok := a.ids[key]; ok {
Expand All @@ -152,20 +166,6 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err
return nil
}
keys = keys[:insert]
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Slice(keys, func(i, j int) bool {
if keys[i].Type < keys[j].Type {
return true
}
if keys[i].Code < keys[j].Code {
return true
}
if keys[i].Issuer < keys[j].Issuer {
return true
}
return false
})

err := bulkInsert(
ctx,
Expand Down Expand Up @@ -213,3 +213,7 @@ func NewAssetLoaderStub() AssetLoaderStub {
func (a AssetLoaderStub) Insert(asset AssetKey, id int64) {
a.Loader.ids[asset] = id
}

func (a AssetLoaderStub) Sealed() {
a.Loader.sealed = true
}
45 changes: 25 additions & 20 deletions services/horizon/internal/db2/history/asset_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,34 @@ func TestAssetLoader(t *testing.T) {
for i := 0; i < 100; i++ {
var key AssetKey
if i == 0 {
key.Type = "native"
key = AssetKeyFromXDR(xdr.Asset{Type: xdr.AssetTypeAssetTypeNative})
} else if i%2 == 0 {
key.Type = "credit_alphanum4"
key.Code = fmt.Sprintf("ab%d", i)
key.Issuer = keypair.MustRandom().Address()
code := [4]byte{0, 0, 0, 0}
copy(code[:], fmt.Sprintf("ab%d", i))
key = AssetKeyFromXDR(xdr.Asset{
Type: xdr.AssetTypeAssetTypeCreditAlphanum4,
AlphaNum4: &xdr.AlphaNum4{
AssetCode: code,
Issuer: xdr.MustAddress(keypair.MustRandom().Address())}})
} else {
key.Type = "credit_alphanum12"
key.Code = fmt.Sprintf("abcdef%d", i)
key.Issuer = keypair.MustRandom().Address()
code := [12]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
copy(code[:], fmt.Sprintf("abcdef%d", i))
key = AssetKeyFromXDR(xdr.Asset{
Type: xdr.AssetTypeAssetTypeCreditAlphanum12,
AlphaNum12: &xdr.AlphaNum12{
AssetCode: code,
Issuer: xdr.MustAddress(keypair.MustRandom().Address())}})

}
keys = append(keys, key)
}

loader := NewAssetLoader()
var futures []FutureAssetID
for _, key := range keys {
future := loader.GetFuture(key)
futures = append(futures, future)
assert.Panics(t, func() {
loader.GetNow(key)
})
assert.Panics(t, func() {
future.Value()
})
_, err := future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid asset loader state,`)
duplicateFuture := loader.GetFuture(key)
assert.Equal(t, future, duplicateFuture)
}
Expand All @@ -56,12 +60,9 @@ func TestAssetLoader(t *testing.T) {
})

q := &Q{session}
for i, key := range keys {
future := futures[i]
internalID := loader.GetNow(key)
val, err := future.Value()
for _, key := range keys {
internalID, err := loader.GetNow(key)
assert.NoError(t, err)
assert.Equal(t, internalID, val)
var assetXDR xdr.Asset
if key.Type == "native" {
assetXDR = xdr.MustNewNativeAsset()
Expand All @@ -72,4 +73,8 @@ func TestAssetLoader(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, assetID, internalID)
}

_, err := loader.GetNow(AssetKey{})
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type FutureClaimableBalanceID struct {

// Value implements the database/sql/driver Valuer interface.
func (a FutureClaimableBalanceID) Value() (driver.Value, error) {
return a.loader.getNow(a.id), nil
return a.loader.getNow(a.id)
}

// ClaimableBalanceLoader will map claimable balance ids to their internal
Expand Down Expand Up @@ -64,11 +64,15 @@ func (a *ClaimableBalanceLoader) GetFuture(id string) FutureClaimableBalanceID {
// getNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any getNow
// call can succeed.
func (a *ClaimableBalanceLoader) getNow(id string) int64 {
func (a *ClaimableBalanceLoader) getNow(id string) (int64, error) {
if !a.sealed {
return 0, fmt.Errorf(`invalid claimable balance loader state,
Exec was not called yet to properly seal and resolve %v id`, id)
}
if internalID, ok := a.ids[id]; !ok {
panic(fmt.Errorf("id %v not present", id))
return 0, fmt.Errorf(`claimable balance loader id %q was not found`, id)
} else {
return internalID
return internalID, nil
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,9 @@ func TestClaimableBalanceLoader(t *testing.T) {
for _, id := range ids {
future := loader.GetFuture(id)
futures = append(futures, future)
assert.Panics(t, func() {
loader.getNow(id)
})
assert.Panics(t, func() {
future.Value()
})
_, err := future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid claimable balance loader state,`)
duplicateFuture := loader.GetFuture(id)
assert.Equal(t, future, duplicateFuture)
}
Expand All @@ -50,13 +47,16 @@ func TestClaimableBalanceLoader(t *testing.T) {
q := &Q{session}
for i, id := range ids {
future := futures[i]
internalID := loader.getNow(id)
val, err := future.Value()
internalID, err := future.Value()
assert.NoError(t, err)
assert.Equal(t, internalID, val)
cb, err := q.ClaimableBalanceByID(context.Background(), id)
assert.NoError(t, err)
assert.Equal(t, cb.BalanceID, id)
assert.Equal(t, cb.InternalID, internalID)
}

futureCb := &FutureClaimableBalanceID{id: "not-present", loader: loader}
_, err := futureCb.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
8 changes: 6 additions & 2 deletions services/horizon/internal/db2/history/fee_bump_scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -269,6 +270,8 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture {
details, err := json.Marshal(map[string]string{
"bump_to": "98",
})

fmt.Print(string(details))
sreuland marked this conversation as resolved.
Show resolved Hide resolved
tt.Assert.NoError(err)

tt.Assert.NoError(opBuilder.Add(
Expand Down Expand Up @@ -296,9 +299,10 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture {
EffectSequenceBumped,
details,
)

tt.Assert.NoError(err)
tt.Assert.NoError(accountLoader.Exec(ctx, q))
tt.Assert.NoError(effectBuilder.Exec(ctx, q))
tt.Assert.NoError(accountLoader.Exec(ctx, q.SessionInterface))
tt.Assert.NoError(effectBuilder.Exec(ctx, q.SessionInterface))

tt.Assert.NoError(q.Commit())

Expand Down
18 changes: 13 additions & 5 deletions services/horizon/internal/db2/history/liquidity_pool_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type FutureLiquidityPoolID struct {

// Value implements the database/sql/driver Valuer interface.
func (a FutureLiquidityPoolID) Value() (driver.Value, error) {
return a.loader.GetNow(a.id), nil
return a.loader.GetNow(a.id)
}

// LiquidityPoolLoader will map liquidity pools to their internal
Expand Down Expand Up @@ -64,11 +64,15 @@ func (a *LiquidityPoolLoader) GetFuture(id string) FutureLiquidityPoolID {
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *LiquidityPoolLoader) GetNow(id string) int64 {
if id, ok := a.ids[id]; !ok {
panic(fmt.Errorf("id %v not present", id))
func (a *LiquidityPoolLoader) GetNow(id string) (int64, error) {
if !a.sealed {
return 0, fmt.Errorf(`invalid liquidity pool loader state,
Exec was not called yet to properly seal and resolve %v id`, id)
}
if internalID, ok := a.ids[id]; !ok {
return 0, fmt.Errorf(`liquidity pool loader id %q was not found`, id)
} else {
return id
return internalID, nil
}
}

Expand Down Expand Up @@ -158,3 +162,7 @@ func NewLiquidityPoolLoaderStub() LiquidityPoolLoaderStub {
func (a LiquidityPoolLoaderStub) Insert(lp string, id int64) {
a.Loader.ids[lp] = id
}

func (a LiquidityPoolLoaderStub) Sealed() {
a.Loader.sealed = true
}
Loading
Loading