Skip to content

Commit

Permalink
integrating new loaders and builders into processors (#5083)
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland authored Oct 26, 2023
1 parent 8775648 commit 98c135c
Show file tree
Hide file tree
Showing 32 changed files with 467 additions and 372 deletions.
2 changes: 1 addition & 1 deletion services/horizon/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3'
services:
horizon-postgres:
image: postgres:9.6.17-alpine
image: postgres:postgres:12-bullseye
restart: on-failure
environment:
- POSTGRES_HOST_AUTH_METHOD=trust
Expand Down
4 changes: 1 addition & 3 deletions services/horizon/docker/verify-range/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
FROM ubuntu:20.04

MAINTAINER Bartek Nowotarski <bartek@stellar.org>

ARG STELLAR_CORE_VERSION
ENV STELLAR_CORE_VERSION=${STELLAR_CORE_VERSION:-*}
# to remove tzdata interactive flow
ENV DEBIAN_FRONTEND=noninteractive

ADD dependencies /
RUN ["chmod", "+x", "dependencies"]
RUN ["chmod", "+x", "/dependencies"]
RUN /dependencies

ADD stellar-core.cfg /
Expand Down
6 changes: 3 additions & 3 deletions services/horizon/docker/verify-range/dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ echo "deb https://apt.stellar.org $(lsb_release -cs) stable" | sudo tee -a /etc/
apt-get update
apt-get install -y stellar-core=${STELLAR_CORE_VERSION}

wget -q https://dl.google.com/go/go1.18.linux-amd64.tar.gz
tar -C /usr/local -xzf go1.18.linux-amd64.tar.gz
wget -q https://dl.google.com/go/go1.20.linux-amd64.tar.gz
tar -C /usr/local -xzf go1.20.linux-amd64.tar.gz

git clone https://github.com/stellar/go.git stellar-go
cd stellar-go
# By default "git fetch" only fetches refs/<branchname>
# Below ensures we also fetch PR refs
git config --add remote.origin.fetch "+refs/pull/*/head:refs/remotes/origin/pull/*"
git fetch --force --quiet origin
/usr/local/go/bin/go build -v ./services/horizon
/usr/local/go/bin/go build -v ./services/horizon/.
1 change: 1 addition & 0 deletions services/horizon/internal/actions/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func checkOuterHashResponse(
}

func TestFeeBumpTransactionPage(t *testing.T) {

tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
Expand Down
15 changes: 10 additions & 5 deletions services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const loaderLookupBatchSize = 50000

// Value implements the database/sql/driver Valuer interface.
func (a FutureAccountID) Value() (driver.Value, error) {
return a.loader.GetNow(a.address), nil
return a.loader.GetNow(a.address)
}

// AccountLoader will map account addresses to their history
Expand Down Expand Up @@ -71,11 +71,15 @@ func (a *AccountLoader) GetFuture(address string) FutureAccountID {
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *AccountLoader) GetNow(address string) int64 {
if id, ok := a.ids[address]; !ok {
panic(fmt.Errorf("address %v not present", address))
func (a *AccountLoader) GetNow(address string) (int64, error) {
if !a.sealed {
return 0, fmt.Errorf(`invalid account loader state,
Exec was not called yet to properly seal and resolve %v id`, address)
}
if internalID, ok := a.ids[address]; !ok {
return 0, fmt.Errorf(`account loader address %q was not found`, address)
} else {
return id
return internalID, nil
}
}

Expand Down Expand Up @@ -205,5 +209,6 @@ func NewAccountLoaderStub() AccountLoaderStub {
// Insert updates the wrapped AccountLoader so that the given account
// address is mapped to the provided history account id
func (a AccountLoaderStub) Insert(address string, id int64) {
a.Loader.sealed = true
a.Loader.ids[address] = id
}
24 changes: 10 additions & 14 deletions services/horizon/internal/db2/history/account_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,11 @@ func TestAccountLoader(t *testing.T) {
}

loader := NewAccountLoader()
var futures []FutureAccountID
for _, address := range addresses {
future := loader.GetFuture(address)
futures = append(futures, future)
assert.Panics(t, func() {
loader.GetNow(address)
})
assert.Panics(t, func() {
future.Value()
})
_, err := future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid account loader state,`)
duplicateFuture := loader.GetFuture(address)
assert.Equal(t, future, duplicateFuture)
}
Expand All @@ -42,15 +37,16 @@ func TestAccountLoader(t *testing.T) {
})

q := &Q{session}
for i, address := range addresses {
future := futures[i]
id := loader.GetNow(address)
val, err := future.Value()
for _, address := range addresses {
internalId, err := loader.GetNow(address)
assert.NoError(t, err)
assert.Equal(t, id, val)
var account Account
assert.NoError(t, q.AccountByAddress(context.Background(), &account, address))
assert.Equal(t, account.ID, id)
assert.Equal(t, account.ID, internalId)
assert.Equal(t, account.Address, address)
}

_, err := loader.GetNow("not present")
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
44 changes: 24 additions & 20 deletions services/horizon/internal/db2/history/asset_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql/driver"
"fmt"
"sort"
"strings"

sq "github.com/Masterminds/squirrel"

Expand All @@ -21,11 +22,18 @@ type AssetKey struct {
Issuer string
}

func (key AssetKey) String() string {
if key.Type == xdr.AssetTypeToString[xdr.AssetTypeAssetTypeNative] {
return key.Type
}
return key.Type + "/" + key.Code + "/" + key.Issuer
}

// AssetKeyFromXDR constructs an AssetKey from an xdr asset
func AssetKeyFromXDR(asset xdr.Asset) AssetKey {
return AssetKey{
Type: xdr.AssetTypeToString[asset.Type],
Code: asset.GetCode(),
Code: strings.TrimRight(asset.GetCode(), "\x00"),
Issuer: asset.GetIssuer(),
}
}
Expand All @@ -41,7 +49,7 @@ type FutureAssetID struct {

// Value implements the database/sql/driver Valuer interface.
func (a FutureAssetID) Value() (driver.Value, error) {
return a.loader.GetNow(a.asset), nil
return a.loader.GetNow(a.asset)
}

// AssetLoader will map assets to their history
Expand Down Expand Up @@ -81,11 +89,15 @@ func (a *AssetLoader) GetFuture(asset AssetKey) FutureAssetID {
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *AssetLoader) GetNow(asset AssetKey) int64 {
if id, ok := a.ids[asset]; !ok {
panic(fmt.Errorf("asset %v not present", asset))
func (a *AssetLoader) GetNow(asset AssetKey) (int64, error) {
if !a.sealed {
return 0, fmt.Errorf(`invalid asset loader state,
Exec was not called yet to properly seal and resolve %v id`, asset)
}
if internalID, ok := a.ids[asset]; !ok {
return 0, fmt.Errorf(`asset loader id %v was not found`, asset)
} else {
return id
return internalID, nil
}
}

Expand Down Expand Up @@ -137,6 +149,11 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err
assetTypes := make([]string, 0, len(a.set)-len(a.ids))
assetCodes := make([]string, 0, len(a.set)-len(a.ids))
assetIssuers := make([]string, 0, len(a.set)-len(a.ids))
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Slice(keys, func(i, j int) bool {
return keys[i].String() < keys[j].String()
})
insert := 0
for _, key := range keys {
if _, ok := a.ids[key]; ok {
Expand All @@ -152,20 +169,6 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err
return nil
}
keys = keys[:insert]
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Slice(keys, func(i, j int) bool {
if keys[i].Type < keys[j].Type {
return true
}
if keys[i].Code < keys[j].Code {
return true
}
if keys[i].Issuer < keys[j].Issuer {
return true
}
return false
})

err := bulkInsert(
ctx,
Expand Down Expand Up @@ -211,5 +214,6 @@ func NewAssetLoaderStub() AssetLoaderStub {
// Insert updates the wrapped AssetLoaderStub so that the given asset
// address is mapped to the provided history asset id
func (a AssetLoaderStub) Insert(asset AssetKey, id int64) {
a.Loader.sealed = true
a.Loader.ids[asset] = id
}
67 changes: 47 additions & 20 deletions services/horizon/internal/db2/history/asset_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,28 @@ import (
"github.com/stellar/go/xdr"
)

func TestAssetKeyToString(t *testing.T) {
num4key := AssetKey{
Type: "credit_alphanum4",
Code: "USD",
Issuer: "A1B2C3",
}

num12key := AssetKey{
Type: "credit_alphanum12",
Code: "USDABC",
Issuer: "A1B2C3",
}

nativekey := AssetKey{
Type: "native",
}

assert.Equal(t, num4key.String(), "credit_alphanum4/USD/A1B2C3")
assert.Equal(t, num12key.String(), "credit_alphanum12/USDABC/A1B2C3")
assert.Equal(t, nativekey.String(), "native")
}

func TestAssetLoader(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
Expand All @@ -22,30 +44,34 @@ func TestAssetLoader(t *testing.T) {
for i := 0; i < 100; i++ {
var key AssetKey
if i == 0 {
key.Type = "native"
key = AssetKeyFromXDR(xdr.Asset{Type: xdr.AssetTypeAssetTypeNative})
} else if i%2 == 0 {
key.Type = "credit_alphanum4"
key.Code = fmt.Sprintf("ab%d", i)
key.Issuer = keypair.MustRandom().Address()
code := [4]byte{0, 0, 0, 0}
copy(code[:], fmt.Sprintf("ab%d", i))
key = AssetKeyFromXDR(xdr.Asset{
Type: xdr.AssetTypeAssetTypeCreditAlphanum4,
AlphaNum4: &xdr.AlphaNum4{
AssetCode: code,
Issuer: xdr.MustAddress(keypair.MustRandom().Address())}})
} else {
key.Type = "credit_alphanum12"
key.Code = fmt.Sprintf("abcdef%d", i)
key.Issuer = keypair.MustRandom().Address()
code := [12]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
copy(code[:], fmt.Sprintf("abcdef%d", i))
key = AssetKeyFromXDR(xdr.Asset{
Type: xdr.AssetTypeAssetTypeCreditAlphanum12,
AlphaNum12: &xdr.AlphaNum12{
AssetCode: code,
Issuer: xdr.MustAddress(keypair.MustRandom().Address())}})

}
keys = append(keys, key)
}

loader := NewAssetLoader()
var futures []FutureAssetID
for _, key := range keys {
future := loader.GetFuture(key)
futures = append(futures, future)
assert.Panics(t, func() {
loader.GetNow(key)
})
assert.Panics(t, func() {
future.Value()
})
_, err := future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid asset loader state,`)
duplicateFuture := loader.GetFuture(key)
assert.Equal(t, future, duplicateFuture)
}
Expand All @@ -56,12 +82,9 @@ func TestAssetLoader(t *testing.T) {
})

q := &Q{session}
for i, key := range keys {
future := futures[i]
internalID := loader.GetNow(key)
val, err := future.Value()
for _, key := range keys {
internalID, err := loader.GetNow(key)
assert.NoError(t, err)
assert.Equal(t, internalID, val)
var assetXDR xdr.Asset
if key.Type == "native" {
assetXDR = xdr.MustNewNativeAsset()
Expand All @@ -72,4 +95,8 @@ func TestAssetLoader(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, assetID, internalID)
}

_, err := loader.GetNow(AssetKey{})
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type FutureClaimableBalanceID struct {

// Value implements the database/sql/driver Valuer interface.
func (a FutureClaimableBalanceID) Value() (driver.Value, error) {
return a.loader.getNow(a.id), nil
return a.loader.getNow(a.id)
}

// ClaimableBalanceLoader will map claimable balance ids to their internal
Expand Down Expand Up @@ -64,11 +64,15 @@ func (a *ClaimableBalanceLoader) GetFuture(id string) FutureClaimableBalanceID {
// getNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any getNow
// call can succeed.
func (a *ClaimableBalanceLoader) getNow(id string) int64 {
func (a *ClaimableBalanceLoader) getNow(id string) (int64, error) {
if !a.sealed {
return 0, fmt.Errorf(`invalid claimable balance loader state,
Exec was not called yet to properly seal and resolve %v id`, id)
}
if internalID, ok := a.ids[id]; !ok {
panic(fmt.Errorf("id %v not present", id))
return 0, fmt.Errorf(`claimable balance loader id %q was not found`, id)
} else {
return internalID
return internalID, nil
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,9 @@ func TestClaimableBalanceLoader(t *testing.T) {
for _, id := range ids {
future := loader.GetFuture(id)
futures = append(futures, future)
assert.Panics(t, func() {
loader.getNow(id)
})
assert.Panics(t, func() {
future.Value()
})
_, err := future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid claimable balance loader state,`)
duplicateFuture := loader.GetFuture(id)
assert.Equal(t, future, duplicateFuture)
}
Expand All @@ -50,13 +47,16 @@ func TestClaimableBalanceLoader(t *testing.T) {
q := &Q{session}
for i, id := range ids {
future := futures[i]
internalID := loader.getNow(id)
val, err := future.Value()
internalID, err := future.Value()
assert.NoError(t, err)
assert.Equal(t, internalID, val)
cb, err := q.ClaimableBalanceByID(context.Background(), id)
assert.NoError(t, err)
assert.Equal(t, cb.BalanceID, id)
assert.Equal(t, cb.InternalID, internalID)
}

futureCb := &FutureClaimableBalanceID{id: "not-present", loader: loader}
_, err := futureCb.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
Loading

0 comments on commit 98c135c

Please sign in to comment.