Skip to content

Commit

Permalink
services/horizon/internal/ingest/processors: Refactor effects process…
Browse files Browse the repository at this point in the history
…or to support new ingestion data flow (#5028)
  • Loading branch information
tamirms authored Aug 31, 2023
1 parent 21d016f commit 8775648
Show file tree
Hide file tree
Showing 8 changed files with 474 additions and 437 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"

"github.com/guregu/null"

"github.com/stellar/go/support/db"
)

// EffectBatchInsertBuilder is used to insert effects into the
// history_effects table
type EffectBatchInsertBuilder interface {
Add(
accountID int64,
accountID FutureAccountID,
muxedAccount null.String,
operationID int64,
order uint32,
Expand All @@ -37,7 +38,7 @@ func (q *Q) NewEffectBatchInsertBuilder() EffectBatchInsertBuilder {

// Add adds a effect to the batch
func (i *effectBatchInsertBuilder) Add(
accountID int64,
accountID FutureAccountID,
muxedAccount null.String,
operationID int64,
order uint32,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/guregu/null"

"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/toid"
)
Expand All @@ -18,8 +19,7 @@ func TestAddEffect(t *testing.T) {

address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY"
muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26"
accounIDs, err := q.CreateAccounts(tt.Ctx, []string{address}, 1)
tt.Assert.NoError(err)
accountLoader := NewAccountLoader()

builder := q.NewEffectBatchInsertBuilder()
sequence := int32(56)
Expand All @@ -29,7 +29,7 @@ func TestAddEffect(t *testing.T) {
})

err = builder.Add(
accounIDs[address],
accountLoader.GetFuture(address),
null.StringFrom(muxedAddres),
toid.New(sequence, 1, 1).ToInt64(),
1,
Expand All @@ -38,6 +38,7 @@ func TestAddEffect(t *testing.T) {
)
tt.Assert.NoError(err)

tt.Assert.NoError(accountLoader.Exec(tt.Ctx, q))
tt.Assert.NoError(builder.Exec(tt.Ctx, q))
tt.Assert.NoError(q.Commit())

Expand Down
35 changes: 15 additions & 20 deletions services/horizon/internal/db2/history/effect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,27 @@ func TestEffectsForLiquidityPool(t *testing.T) {
// Insert Effect
address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY"
muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26"
accountIDs, err := q.CreateAccounts(tt.Ctx, []string{address}, 1)
tt.Assert.NoError(err)
accountLoader := NewAccountLoader()

builder := q.NewEffectBatchInsertBuilder()
sequence := int32(56)
details, err := json.Marshal(map[string]string{
"amount": "1000.0000000",
"asset_type": "native",
})
tt.Assert.NoError(err)
opID := toid.New(sequence, 1, 1).ToInt64()
err = builder.Add(
accountIDs[address],
tt.Assert.NoError(builder.Add(
accountLoader.GetFuture(address),
null.StringFrom(muxedAddres),
opID,
1,
3,
details,
)
tt.Assert.NoError(err)
))

err = builder.Exec(tt.Ctx, q)
tt.Assert.NoError(err)
tt.Assert.NoError(accountLoader.Exec(tt.Ctx, q))
tt.Assert.NoError(builder.Exec(tt.Ctx, q))

// Insert Liquidity Pool history
liquidityPoolID := "abcde"
Expand Down Expand Up @@ -79,8 +78,7 @@ func TestEffectsForTrustlinesSponsorshipEmptyAssetType(t *testing.T) {

address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY"
muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26"
accountIDs, err := q.CreateAccounts(tt.Ctx, []string{address}, 1)
tt.Assert.NoError(err)
accountLoader := NewAccountLoader()

builder := q.NewEffectBatchInsertBuilder()
sequence := int32(56)
Expand Down Expand Up @@ -142,27 +140,24 @@ func TestEffectsForTrustlinesSponsorshipEmptyAssetType(t *testing.T) {

for i, test := range tests {
var bytes []byte
bytes, err = json.Marshal(test.details)
bytes, err := json.Marshal(test.details)
tt.Require.NoError(err)

err = builder.Add(
accountIDs[address],
tt.Require.NoError(builder.Add(
accountLoader.GetFuture(address),
null.StringFrom(muxedAddres),
opID,
uint32(i),
test.effectType,
bytes,
)
tt.Require.NoError(err)
))
}

err = builder.Exec(tt.Ctx, q)
tt.Require.NoError(err)
tt.Require.NoError(accountLoader.Exec(tt.Ctx, q))
tt.Require.NoError(builder.Exec(tt.Ctx, q))
tt.Assert.NoError(q.Commit())

var results []Effect
err = q.Effects().Select(tt.Ctx, &results)
tt.Require.NoError(err)
tt.Require.NoError(q.Effects().Select(tt.Ctx, &results))
tt.Require.Len(results, len(tests))

for i, test := range tests {
Expand Down
9 changes: 5 additions & 4 deletions services/horizon/internal/db2/history/fee_bump_scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
sq "github.com/Masterminds/squirrel"
"github.com/guregu/null"

"github.com/stretchr/testify/assert"

"github.com/stellar/go/ingest"
"github.com/stellar/go/network"
"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/toid"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/assert"
)

func ledgerToMap(ledger Ledger) map[string]interface{} {
Expand Down Expand Up @@ -285,18 +286,18 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture {
details, err = json.Marshal(map[string]interface{}{"new_seq": 98})
tt.Assert.NoError(err)

accounIDs, err := q.CreateAccounts(ctx, []string{account.Address()}, 1)
tt.Assert.NoError(err)
accountLoader := NewAccountLoader()

err = effectBuilder.Add(
accounIDs[account.Address()],
accountLoader.GetFuture(account.Address()),
null.String{},
toid.New(fixture.Ledger.Sequence, 1, 1).ToInt64(),
1,
EffectSequenceBumped,
details,
)
tt.Assert.NoError(err)
tt.Assert.NoError(accountLoader.Exec(ctx, q))
tt.Assert.NoError(effectBuilder.Exec(ctx, q))

tt.Assert.NoError(q.Commit())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type MockEffectBatchInsertBuilder struct {

// Add mock
func (m *MockEffectBatchInsertBuilder) Add(
accountID int64,
accountID FutureAccountID,
muxedAccount null.String,
operationID int64,
order uint32,
Expand Down
Loading

0 comments on commit 8775648

Please sign in to comment.