Skip to content

Commit

Permalink
Inital commit of the processors package (#5567)
Browse files Browse the repository at this point in the history
  • Loading branch information
chowbao authored Jan 10, 2025
1 parent bd9fa3f commit c29e294
Show file tree
Hide file tree
Showing 43 changed files with 17,023 additions and 3 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ require (

require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da
github.com/docker/docker v27.3.1+incompatible
github.com/docker/go-connections v0.5.0
github.com/fsouza/fake-gcs-server v1.49.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da h1:aIftn67I1fkbMa512G+w+Pxci9hJPB8oMnkcP3iZF38=
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/djherbis/fscache v0.10.1 h1:hDv+RGyvD+UDKyRYuLoVNbuRTnf2SrA2K3VyR1br9lk=
Expand Down
111 changes: 111 additions & 0 deletions ingest/processors/account.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package processors

import (
"fmt"

"github.com/guregu/null/zero"
"github.com/stellar/go/ingest"
"github.com/stellar/go/xdr"
)

// TransformAccount converts an account from the history archive ingestion system into a form suitable for BigQuery
func TransformAccount(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (AccountOutput, error) {
ledgerEntry, changeType, outputDeleted, err := ExtractEntryFromChange(ledgerChange)
if err != nil {
return AccountOutput{}, err
}

accountEntry, accountFound := ledgerEntry.Data.GetAccount()
if !accountFound {
return AccountOutput{}, fmt.Errorf("could not extract account data from ledger entry; actual type is %s", ledgerEntry.Data.Type)
}

outputID, err := accountEntry.AccountId.GetAddress()
if err != nil {
return AccountOutput{}, err
}

outputBalance := accountEntry.Balance
if outputBalance < 0 {
return AccountOutput{}, fmt.Errorf("balance is negative (%d) for account: %s", outputBalance, outputID)
}

//The V1 struct is the first version of the extender from accountEntry. It contains information on liabilities, and in the future
//more extensions may contain extra information
accountExtensionInfo, V1Found := accountEntry.Ext.GetV1()
var outputBuyingLiabilities, outputSellingLiabilities xdr.Int64
if V1Found {
liabilities := accountExtensionInfo.Liabilities
outputBuyingLiabilities, outputSellingLiabilities = liabilities.Buying, liabilities.Selling
if outputBuyingLiabilities < 0 {
return AccountOutput{}, fmt.Errorf("the buying liabilities count is negative (%d) for account: %s", outputBuyingLiabilities, outputID)
}

if outputSellingLiabilities < 0 {
return AccountOutput{}, fmt.Errorf("the selling liabilities count is negative (%d) for account: %s", outputSellingLiabilities, outputID)
}
}

outputSequenceNumber := int64(accountEntry.SeqNum)
if outputSequenceNumber < 0 {
return AccountOutput{}, fmt.Errorf("account sequence number is negative (%d) for account: %s", outputSequenceNumber, outputID)
}
outputSequenceLedger := accountEntry.SeqLedger()
outputSequenceTime := accountEntry.SeqTime()

outputNumSubentries := uint32(accountEntry.NumSubEntries)

inflationDestAccountID := accountEntry.InflationDest
var outputInflationDest string
if inflationDestAccountID != nil {
outputInflationDest, err = inflationDestAccountID.GetAddress()
if err != nil {
return AccountOutput{}, err
}
}

outputFlags := uint32(accountEntry.Flags)

outputHomeDomain := string(accountEntry.HomeDomain)

outputMasterWeight := int32(accountEntry.MasterKeyWeight())
outputThreshLow := int32(accountEntry.ThresholdLow())
outputThreshMed := int32(accountEntry.ThresholdMedium())
outputThreshHigh := int32(accountEntry.ThresholdHigh())

outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq)

closedAt, err := TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime)
if err != nil {
return AccountOutput{}, err
}

ledgerSequence := header.Header.LedgerSeq

transformedAccount := AccountOutput{
AccountID: outputID,
Balance: ConvertStroopValueToReal(outputBalance),
BuyingLiabilities: ConvertStroopValueToReal(outputBuyingLiabilities),
SellingLiabilities: ConvertStroopValueToReal(outputSellingLiabilities),
SequenceNumber: outputSequenceNumber,
SequenceLedger: zero.IntFrom(int64(outputSequenceLedger)),
SequenceTime: zero.IntFrom(int64(outputSequenceTime)),
NumSubentries: outputNumSubentries,
InflationDestination: outputInflationDest,
Flags: outputFlags,
HomeDomain: outputHomeDomain,
MasterWeight: outputMasterWeight,
ThresholdLow: outputThreshLow,
ThresholdMedium: outputThreshMed,
ThresholdHigh: outputThreshHigh,
LastModifiedLedger: outputLastModifiedLedger,
Sponsor: ledgerEntrySponsorToNullString(ledgerEntry),
NumSponsored: uint32(accountEntry.NumSponsored()),
NumSponsoring: uint32(accountEntry.NumSponsoring()),
LedgerEntryChange: uint32(changeType),
Deleted: outputDeleted,
ClosedAt: closedAt,
LedgerSequence: uint32(ledgerSequence),
}
return transformedAccount, nil
}
54 changes: 54 additions & 0 deletions ingest/processors/account_signer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package processors

import (
"fmt"
"sort"

"github.com/guregu/null"
"github.com/stellar/go/ingest"
"github.com/stellar/go/xdr"
)

// TransformAccountSigners converts account signers from the history archive ingestion system into a form suitable for BigQuery
func TransformAccountSigners(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) ([]AccountSignerOutput, error) {
var signers []AccountSignerOutput

ledgerEntry, changeType, outputDeleted, err := ExtractEntryFromChange(ledgerChange)
if err != nil {
return signers, err
}
outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq)
accountEntry, accountFound := ledgerEntry.Data.GetAccount()
if !accountFound {
return signers, fmt.Errorf("could not extract signer data from ledger entry of type: %+v", ledgerEntry.Data.Type)
}

closedAt, err := TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime)
if err != nil {
return signers, err
}

ledgerSequence := header.Header.LedgerSeq

sponsors := accountEntry.SponsorPerSigner()
for signer, weight := range accountEntry.SignerSummary() {
var sponsor null.String
if sponsorDesc, isSponsored := sponsors[signer]; isSponsored {
sponsor = null.StringFrom(sponsorDesc.Address())
}

signers = append(signers, AccountSignerOutput{
AccountID: accountEntry.AccountId.Address(),
Signer: signer,
Weight: weight,
Sponsor: sponsor,
LastModifiedLedger: outputLastModifiedLedger,
LedgerEntryChange: uint32(changeType),
Deleted: outputDeleted,
ClosedAt: closedAt,
LedgerSequence: uint32(ledgerSequence),
})
}
sort.Slice(signers, func(a, b int) bool { return signers[a].Weight < signers[b].Weight })
return signers, nil
}
165 changes: 165 additions & 0 deletions ingest/processors/account_signer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package processors

import (
"fmt"
"testing"
"time"

"github.com/guregu/null"

"github.com/stretchr/testify/assert"

"github.com/stellar/go/ingest"
"github.com/stellar/go/xdr"
)

func TestTransformAccountSigner(t *testing.T) {
type inputStruct struct {
injest ingest.Change
}

type transformTest struct {
input inputStruct
wantOutput []AccountSignerOutput
wantErr error
}

hardCodedInput := makeSignersTestInput()
hardCodedOutput := makeSignersTestOutput()

tests := []transformTest{
{
inputStruct{
ingest.Change{
Type: xdr.LedgerEntryTypeOffer,
Pre: nil,
Post: &xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeOffer,
},
},
},
},
nil, fmt.Errorf("could not extract signer data from ledger entry of type: LedgerEntryTypeOffer"),
},
{
inputStruct{
hardCodedInput,
},
hardCodedOutput, nil,
},
}

for _, test := range tests {
header := xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
ScpValue: xdr.StellarValue{
CloseTime: 1000,
},
LedgerSeq: 10,
},
}
actualOutput, actualError := TransformAccountSigners(test.input.injest, header)
assert.Equal(t, test.wantErr, actualError)
assert.Equal(t, test.wantOutput, actualOutput)
}
}

func makeSignersTestInput() ingest.Change {
sponsor, _ := xdr.AddressToAccountId("GBADGWKHSUFOC4C7E3KXKINZSRX5KPHUWHH67UGJU77LEORGVLQ3BN3B")

var ledgerEntry = xdr.LedgerEntry{
LastModifiedLedgerSeq: 30705278,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: testAccount1ID,
Balance: 10959979,
SeqNum: 117801117454198833,
NumSubEntries: 141,
InflationDest: &testAccount2ID,
Flags: 4,
HomeDomain: "examplehome.com",
Thresholds: xdr.Thresholds([4]byte{2, 1, 3, 5}),
Ext: xdr.AccountEntryExt{
V: 1,
V1: &xdr.AccountEntryExtensionV1{
Liabilities: xdr.Liabilities{
Buying: 1000,
Selling: 1500,
},
Ext: xdr.AccountEntryExtensionV1Ext{
V: 2,
V2: &xdr.AccountEntryExtensionV2{
SignerSponsoringIDs: []xdr.SponsorshipDescriptor{
&sponsor,
nil,
},
},
},
},
},
Signers: []xdr.Signer{
{
Key: xdr.SignerKey{
Type: xdr.SignerKeyTypeSignerKeyTypeEd25519,
Ed25519: &xdr.Uint256{4, 5, 6},
PreAuthTx: nil,
HashX: nil,
},
Weight: 10.0,
}, {
Key: xdr.SignerKey{
Type: xdr.SignerKeyTypeSignerKeyTypeEd25519,
Ed25519: &xdr.Uint256{10, 11, 12},
PreAuthTx: nil,
HashX: nil,
},
Weight: 20.0,
},
},
},
},
}
return ingest.Change{
Type: xdr.LedgerEntryTypeAccount,
Pre: &ledgerEntry,
Post: nil,
}
}

func makeSignersTestOutput() []AccountSignerOutput {
return []AccountSignerOutput{
{
AccountID: testAccount1ID.Address(),
Signer: "GCEODJVUUVYVFD5KT4TOEDTMXQ76OPFOQC2EMYYMLPXQCUVPOB6XRWPQ",
Weight: 2.0,
Sponsor: null.String{},
LastModifiedLedger: 30705278,
LedgerEntryChange: 2,
Deleted: true,
LedgerSequence: 10,
ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC),
}, {
AccountID: testAccount1ID.Address(),
Signer: "GACAKBQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB3BQ",
Weight: 10.0,
Sponsor: null.StringFrom("GBADGWKHSUFOC4C7E3KXKINZSRX5KPHUWHH67UGJU77LEORGVLQ3BN3B"),
LastModifiedLedger: 30705278,
LedgerEntryChange: 2,
Deleted: true,
LedgerSequence: 10,
ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC),
}, {
AccountID: testAccount1ID.Address(),
Signer: "GAFAWDAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABNDC",
Weight: 20.0,
Sponsor: null.String{},
LastModifiedLedger: 30705278,
LedgerEntryChange: 2,
Deleted: true,
LedgerSequence: 10,
ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC),
},
}
}
Loading

0 comments on commit c29e294

Please sign in to comment.