diff --git a/exp/lighthorizon/actions/accounts.go b/exp/lighthorizon/actions/accounts.go new file mode 100644 index 0000000000..7d045ceb74 --- /dev/null +++ b/exp/lighthorizon/actions/accounts.go @@ -0,0 +1,129 @@ +package actions + +import ( + "net/http" + "strconv" + + "github.com/stellar/go/support/log" + + "github.com/stellar/go/exp/lighthorizon/adapters" + "github.com/stellar/go/exp/lighthorizon/services" + hProtocol "github.com/stellar/go/protocols/horizon" + "github.com/stellar/go/protocols/horizon/operations" + "github.com/stellar/go/support/render/hal" + "github.com/stellar/go/toid" +) + +const ( + urlAccountId = "account_id" +) + +func accountRequestParams(w http.ResponseWriter, r *http.Request) (string, pagination) { + var accountId string + var accountErr bool + + if accountId, accountErr = getURLParam(r, urlAccountId); !accountErr { + sendErrorResponse(w, http.StatusBadRequest, "") + return "", pagination{} + } + + paginate, err := paging(r) + if err != nil { + sendErrorResponse(w, http.StatusBadRequest, string(invalidPagingParameters)) + return "", pagination{} + } + + if paginate.Cursor < 1 { + paginate.Cursor = toid.New(1, 1, 1).ToInt64() + } + + if paginate.Limit == 0 { + paginate.Limit = 10 + } + + return accountId, paginate +} + +func NewTXByAccountHandler(lightHorizon services.LightHorizon) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + var accountId string + var paginate pagination + + if accountId, paginate = accountRequestParams(w, r); accountId == "" { + return + } + + page := hal.Page{ + Cursor: strconv.FormatInt(paginate.Cursor, 10), + Order: string(paginate.Order), + Limit: uint64(paginate.Limit), + } + page.Init() + page.FullURL = r.URL + + txns, err := lightHorizon.Transactions.GetTransactionsByAccount(ctx, paginate.Cursor, paginate.Limit, accountId) + if err != nil { + log.Error(err) + sendErrorResponse(w, http.StatusInternalServerError, err.Error()) + return + } + + for _, txn := range txns { + var response hProtocol.Transaction + response, err = adapters.PopulateTransaction(r, &txn) + if err != nil { + log.Error(err) + sendErrorResponse(w, http.StatusInternalServerError, err.Error()) + return + } + + page.Add(response) + } + + page.PopulateLinks() + sendPageResponse(w, page) + } +} + +func NewOpsByAccountHandler(lightHorizon services.LightHorizon) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + var accountId string + var paginate pagination + + if accountId, paginate = accountRequestParams(w, r); accountId == "" { + return + } + + page := hal.Page{ + Cursor: strconv.FormatInt(paginate.Cursor, 10), + Order: string(paginate.Order), + Limit: uint64(paginate.Limit), + } + page.Init() + page.FullURL = r.URL + + ops, err := lightHorizon.Operations.GetOperationsByAccount(ctx, paginate.Cursor, paginate.Limit, accountId) + if err != nil { + log.Error(err) + sendErrorResponse(w, http.StatusInternalServerError, err.Error()) + return + } + + for _, op := range ops { + var response operations.Operation + response, err = adapters.PopulateOperation(r, &op) + if err != nil { + log.Error(err) + sendErrorResponse(w, http.StatusInternalServerError, err.Error()) + return + } + + page.Add(response) + } + + page.PopulateLinks() + sendPageResponse(w, page) + } +} diff --git a/exp/lighthorizon/actions/main.go b/exp/lighthorizon/actions/main.go index 8a5b1f9b4c..c091dad1ab 100644 --- a/exp/lighthorizon/actions/main.go +++ b/exp/lighthorizon/actions/main.go @@ -3,10 +3,14 @@ package actions import ( "embed" "encoding/json" + "fmt" "net/http" "net/url" "strconv" + "github.com/go-chi/chi" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/stellar/go/support/log" "github.com/stellar/go/support/render/hal" ) @@ -14,25 +18,40 @@ import ( var ( //go:embed static staticFiles embed.FS + //lint:ignore U1000 temporary + requestCount = promauto.NewCounter(prometheus.CounterOpts{ + Name: "horizon_lite_request_count", + Help: "How many requests have occurred?", + }) + //lint:ignore U1000 temporary + requestTime = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "horizon_lite_request_duration", + Help: "How long do requests take?", + Buckets: append( + prometheus.LinearBuckets(0, 50, 20), + prometheus.LinearBuckets(1000, 1000, 8)..., + ), + }) ) -type Order string -type ErrorMessage string +type order string +type errorMessage string const ( - OrderAsc Order = "asc" - OrderDesc Order = "desc" + orderAsc order = "asc" + orderDesc order = "desc" ) const ( - ServerError ErrorMessage = "Error: A problem occurred on the server while processing request" - InvalidPagingParameters ErrorMessage = "Error: Invalid paging parameters" + //TODO - refactor to use horizon 'problems' package + serverError errorMessage = "Error: A problem occurred on the server while processing request" + invalidPagingParameters errorMessage = "Error: Invalid paging parameters" ) -type Pagination struct { - Limit int64 +type pagination struct { + Limit uint64 Cursor int64 - Order + Order order } func sendPageResponse(w http.ResponseWriter, page hal.Page) { @@ -47,13 +66,13 @@ func sendPageResponse(w http.ResponseWriter, page hal.Page) { func sendErrorResponse(w http.ResponseWriter, errorCode int, errorMsg string) { if errorMsg != "" { - http.Error(w, errorMsg, errorCode) + http.Error(w, fmt.Sprintf("Error: %s", errorMsg), errorCode) } else { - http.Error(w, string(ServerError), errorCode) + http.Error(w, string(serverError), errorCode) } } -func RequestUnaryParam(r *http.Request, paramName string) (string, error) { +func requestUnaryParam(r *http.Request, paramName string) (string, error) { query, err := url.ParseQuery(r.URL.RawQuery) if err != nil { return "", err @@ -61,34 +80,54 @@ func RequestUnaryParam(r *http.Request, paramName string) (string, error) { return query.Get(paramName), nil } -func Paging(r *http.Request) (Pagination, error) { - paginate := Pagination{ - Order: OrderAsc, +func paging(r *http.Request) (pagination, error) { + paginate := pagination{ + Order: orderAsc, } - if cursorRequested, err := RequestUnaryParam(r, "cursor"); err != nil { - return Pagination{}, err + if cursorRequested, err := requestUnaryParam(r, "cursor"); err != nil { + return pagination{}, err } else if cursorRequested != "" { paginate.Cursor, err = strconv.ParseInt(cursorRequested, 10, 64) if err != nil { - return Pagination{}, err + return pagination{}, err } } - if limitRequested, err := RequestUnaryParam(r, "limit"); err != nil { - return Pagination{}, err + if limitRequested, err := requestUnaryParam(r, "limit"); err != nil { + return pagination{}, err } else if limitRequested != "" { - paginate.Limit, err = strconv.ParseInt(limitRequested, 10, 64) + paginate.Limit, err = strconv.ParseUint(limitRequested, 10, 64) if err != nil { - return Pagination{}, err + return pagination{}, err } } - if orderRequested, err := RequestUnaryParam(r, "order"); err != nil { - return Pagination{}, err - } else if orderRequested != "" && orderRequested == string(OrderDesc) { - paginate.Order = OrderDesc + if orderRequested, err := requestUnaryParam(r, "order"); err != nil { + return pagination{}, err + } else if orderRequested != "" && orderRequested == string(orderDesc) { + paginate.Order = orderDesc } return paginate, nil } + +func getURLParam(r *http.Request, key string) (string, bool) { + rctx := chi.RouteContext(r.Context()) + + if rctx == nil { + return "", false + } + + if len(rctx.URLParams.Keys) != len(rctx.URLParams.Values) { + return "", false + } + + for k := len(rctx.URLParams.Keys) - 1; k >= 0; k-- { + if rctx.URLParams.Keys[k] == key { + return rctx.URLParams.Values[k], true + } + } + + return "", false +} diff --git a/exp/lighthorizon/actions/operation.go b/exp/lighthorizon/actions/operation.go deleted file mode 100644 index 6c64b89d3f..0000000000 --- a/exp/lighthorizon/actions/operation.go +++ /dev/null @@ -1,105 +0,0 @@ -package actions - -import ( - "github.com/stellar/go/support/log" - "io" - "net/http" - "strconv" - - "github.com/stellar/go/exp/lighthorizon/adapters" - "github.com/stellar/go/exp/lighthorizon/archive" - "github.com/stellar/go/exp/lighthorizon/index" - "github.com/stellar/go/protocols/horizon/operations" - "github.com/stellar/go/support/render/hal" - "github.com/stellar/go/toid" -) - -func Operations(archiveWrapper archive.Wrapper, indexStore index.Store) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - // For _links rendering, imitate horizon.stellar.org links for horizon-cmp - r.URL.Scheme = "http" - r.URL.Host = "localhost:8080" - - if r.Method != "GET" { - return - } - - paginate, err := Paging(r) - if err != nil { - sendErrorResponse(w, http.StatusBadRequest, string(InvalidPagingParameters)) - return - } - - if paginate.Cursor < 1 { - paginate.Cursor = toid.New(1, 1, 1).ToInt64() - } - - if paginate.Limit < 1 || paginate.Limit > 200 { - paginate.Limit = 10 - } - - page := hal.Page{ - Cursor: strconv.FormatInt(paginate.Cursor, 10), - Order: string(paginate.Order), - Limit: uint64(paginate.Limit), - } - page.Init() - page.FullURL = r.URL - - // For now, use a query param for now to avoid dragging in chi-router. Not - // really the point of the experiment yet. - account, err := RequestUnaryParam(r, "account") - if err != nil { - log.Error(err) - sendErrorResponse(w, http.StatusInternalServerError, "") - return - } - - if account != "" { - // Skip the cursor ahead to the next active checkpoint for this account - var checkpoint uint32 - checkpoint, err = indexStore.NextActive(account, "all/all", uint32(toid.Parse(paginate.Cursor).LedgerSequence/64)) - if err == io.EOF { - // never active. No results. - page.PopulateLinks() - sendPageResponse(w, page) - return - } else if err != nil { - log.Error(err) - sendErrorResponse(w, http.StatusInternalServerError, "") - return - } - ledger := int32(checkpoint * 64) - if ledger < 0 { - // Check we don't overflow going from uint32 -> int32 - log.Error(err) - sendErrorResponse(w, http.StatusInternalServerError, "") - return - } - paginate.Cursor = toid.New(ledger, 1, 1).ToInt64() - } - - //TODO - implement paginate.Order(asc/desc) - ops, err := archiveWrapper.GetOperations(r.Context(), paginate.Cursor, paginate.Limit) - if err != nil { - log.Error(err) - sendErrorResponse(w, http.StatusInternalServerError, "") - return - } - - for _, op := range ops { - var response operations.Operation - response, err = adapters.PopulateOperation(r, &op) - if err != nil { - log.Error(err) - sendErrorResponse(w, http.StatusInternalServerError, "") - return - } - - page.Add(response) - } - - page.PopulateLinks() - sendPageResponse(w, page) - } -} diff --git a/exp/lighthorizon/actions/static/api_docs.yml b/exp/lighthorizon/actions/static/api_docs.yml index 6bc7eb5e9c..707a67eacb 100644 --- a/exp/lighthorizon/actions/static/api_docs.yml +++ b/exp/lighthorizon/actions/static/api_docs.yml @@ -7,78 +7,13 @@ info: servers: - url: http://localhost:8080/ paths: - /transactions: + /accounts/{account_id}/operations: get: - responses: - '200': - description: OK - headers: {} - content: - application/json: - schema: - $ref: '#/components/schemas/CollectionModel_Tx' - example: - _links: - self: - href: http://localhost:8080/transactions?cursor=&limit=0&order= - next: - href: http://localhost:8080/transactions?cursor=6606621773930497&limit=0&order= - prev: - href: http://localhost:8080/transactions?cursor=6606621773930497&limit=0&order=asc - _embedded: - records: - - memo: xdr.MemoText("psp:1405") - _links: - self: - href: http://localhost:8080/transactions/5fef21d5ef75ecf18d65a160cfab17dca8dbf6dbc4e2fd66a510719ad8dddb09 - account: - href: '' - ledger: - href: '' - operations: - href: '' - effects: - href: http://localhost:8080/transactions/5fef21d5ef75ecf18d65a160cfab17dca8dbf6dbc4e2fd66a510719ad8dddb09/effects - precedes: - href: http://localhost:8080/effects?order=asc&cursor=6606621773930497 - succeeds: - href: http://localhost:8080/effects?order=desc&cursor=6606621773930497 - transaction: - href: '' - id: 5fef21d5ef75ecf18d65a160cfab17dca8dbf6dbc4e2fd66a510719ad8dddb09 - paging_token: '6606621773930497' - successful: false - hash: 5fef21d5ef75ecf18d65a160cfab17dca8dbf6dbc4e2fd66a510719ad8dddb09 - ledger: 1538224 - created_at: '2022-06-17T23:29:42Z' - source_account: GCFJN22UG6IZHXKDVAJWAVEQ3NERGCRCURR2FHARNRBNLYFEQZGML4PW - source_account_sequence: '' - fee_account: '' - fee_charged: '3000' - max_fee: '0' - operation_count: 1 - envelope_xdr: AAAAAgAAAACKlutUN5GT3UOoE2BUkNtJEwoipGOinBFsQtXgpIZMxQAAJxAAE05oAAHUKAAAAAEAAAAAAAAAAAAAAABirQ6AAAAAAQAAAAhwc3A6MTQwNQAAAAEAAAAAAAAAAQAAAADpPdN37FA9KVcJfmMBuD8pPcaT5jqlrMeYEOTP36Zo2AAAAAJBVE1ZUgAAAAAAAAAAAAAAZ8rWY3iaDnWNtfpvLpNaCEbKdDjrd2gQODOuKpmj1vMAAAAAGHAagAAAAAAAAAABpIZMxQAAAEDNJwYToiBR6bzElRL4ORJdXXZYO9cE3-ishQLC_fWGrPGhWrW7_UkPJWvxWdQDJBjVOHuA1Jjc94NSe91hSwEL - result_xdr: AAAAAAAAC7j_____AAAAAQAAAAAAAAAB____-gAAAAA= - result_meta_xdr: '' - fee_meta_xdr: '' - memo_type: MemoTypeMemoText - signatures: - - pIZMxQAAAEDNJwYToiBR6bzElRL4ORJdXXZYO9cE3-ishQLC_fWGrPGhWrW7_UkPJWvxWdQDJBjVOHuA1Jjc94NSe91hSwEL - summary: Get Transactions by paged list - operationId: GetTransactions - description: Retrieve transactcions by paged listing. - tags: [] - parameters: + operationId: GetOperationsByAccountId + parameters: - $ref: '#/components/parameters/CursorParam' - $ref: '#/components/parameters/LimitParam' - - in: query - name: id - required: false - schema: - type: string - description: The transaction ID, the hash value. - /operations: - get: + - $ref: '#/components/parameters/AccountIDParam' responses: '200': description: OK @@ -90,24 +25,16 @@ paths: example: _links: self: - href: http://localhost:8080/operations?cursor=6606617478959105&limit=1&order=asc + href: http://localhost:8080/accounts/GDMQQNJM4UL7QIA66P7R2PZHMQINWZBM77BEBMHLFXD5JEUAHGJ7R4JZ/operations?cursor=6606617478959105&limit=1&order=asc next: - href: http://localhost:8080/operations?cursor=6606621773926401&limit=1&order=asc + href: http://localhost:8080/accounts/GDMQQNJM4UL7QIA66P7R2PZHMQINWZBM77BEBMHLFXD5JEUAHGJ7R4JZ/operations?cursor=6606621773926401&limit=1&order=asc prev: - href: http://localhost:8080/operations?cursor=6606621773926401&limit=1&order=desc + href: http://localhost:8080/accounts/GDMQQNJM4UL7QIA66P7R2PZHMQINWZBM77BEBMHLFXD5JEUAHGJ7R4JZ/operations?cursor=6606621773926401&limit=1&order=desc _embedded: records: - _links: self: href: http://localhost:8080/operations/6606621773926401 - transaction: - href: http://localhost:8080/transactions/544469b76cd90978345a4734a0ce69a9d0ddb4a6595a7afc503225a77826722a - effects: - href: http://localhost:8080/operations/6606621773926401/effects - succeeds: - href: http://localhost:8080/effects?order=desc&cursor=6606621773926401 - precedes: - href: http://localhost:8080/effects?order=asc&cursor=6606621773926401 id: '6606621773926401' paging_token: '6606621773926401' transaction_successful: true @@ -128,19 +55,60 @@ paths: selling_asset_code: EURV selling_asset_issuer: GAXXMQMTDUQ4YEPXJMKFBGN3GETPJNEXEUHFCQJKGJDVI3XQCNBU3OZI offer_id: '425531' - summary: Get Operations by paged list - operationId: GetOperations - description: Retrieve operations by paged listing. - tags: [] - parameters: + summary: Get Operations by Account ID and Paged list + description: Get Operations by Account ID and Paged list + tags: [] + /accounts/{account_id}/transactions: + get: + operationId: GetTransactionsByAccountId + parameters: - $ref: '#/components/parameters/CursorParam' - $ref: '#/components/parameters/LimitParam' - - in: query - name: account - required: false - schema: - type: string - description: Get all operations that are related to this account id + - $ref: '#/components/parameters/AccountIDParam' + responses: + '200': + description: OK + headers: {} + content: + application/json: + schema: + $ref: '#/components/schemas/CollectionModel_Tx' + example: + _links: + self: + href: http://localhost:8080/accounts/GDMQQNJM4UL7QIA66P7R2PZHMQINWZBM77BEBMHLFXD5JEUAHGJ7R4JZ/transactions?cursor=&limit=0&order= + next: + href: http://localhost:8080/accounts/GDMQQNJM4UL7QIA66P7R2PZHMQINWZBM77BEBMHLFXD5JEUAHGJ7R4JZ/transactions?cursor=6606621773930497&limit=0&order= + prev: + href: http://localhost:8080/accounts/GDMQQNJM4UL7QIA66P7R2PZHMQINWZBM77BEBMHLFXD5JEUAHGJ7R4JZ/transactions?cursor=6606621773930497&limit=0&order=asc + _embedded: + records: + - memo: xdr.MemoText("psp:1405") + _links: + self: + href: http://localhost:8080/transactions/5fef21d5ef75ecf18d65a160cfab17dca8dbf6dbc4e2fd66a510719ad8dddb09 + id: 5fef21d5ef75ecf18d65a160cfab17dca8dbf6dbc4e2fd66a510719ad8dddb09 + paging_token: '6606621773930497' + successful: false + hash: 5fef21d5ef75ecf18d65a160cfab17dca8dbf6dbc4e2fd66a510719ad8dddb09 + ledger: 1538224 + created_at: '2022-06-17T23:29:42Z' + source_account: GCFJN22UG6IZHXKDVAJWAVEQ3NERGCRCURR2FHARNRBNLYFEQZGML4PW + source_account_sequence: '' + fee_account: '' + fee_charged: '3000' + max_fee: '0' + operation_count: 1 + envelope_xdr: AAAAAgAAAACKlutUN5GT3UOoE2BUkNtJEwoipGOinBFsQtXgpIZMxQAAJxAAE05oAAHUKAAAAAEAAAAAAAAAAAAAAABirQ6AAAAAAQAAAAhwc3A6MTQwNQAAAAEAAAAAAAAAAQAAAADpPdN37FA9KVcJfmMBuD8pPcaT5jqlrMeYEOTP36Zo2AAAAAJBVE1ZUgAAAAAAAAAAAAAAZ8rWY3iaDnWNtfpvLpNaCEbKdDjrd2gQODOuKpmj1vMAAAAAGHAagAAAAAAAAAABpIZMxQAAAEDNJwYToiBR6bzElRL4ORJdXXZYO9cE3-ishQLC_fWGrPGhWrW7_UkPJWvxWdQDJBjVOHuA1Jjc94NSe91hSwEL + result_xdr: AAAAAAAAC7j_____AAAAAQAAAAAAAAAB____-gAAAAA= + result_meta_xdr: '' + fee_meta_xdr: '' + memo_type: MemoTypeMemoText + signatures: + - pIZMxQAAAEDNJwYToiBR6bzElRL4ORJdXXZYO9cE3-ishQLC_fWGrPGhWrW7_UkPJWvxWdQDJBjVOHuA1Jjc94NSe91hSwEL + summary: Get Transactions by Account ID and Paged list + description: Get Transactions by Account ID and Paged list + tags: [] components: parameters: CursorParam: @@ -159,6 +127,22 @@ components: type: integer default: 10 description: The numbers of items to return + AccountIDParam: + name: account_id + in: path + required: true + description: The strkey encoded Account ID + schema: + type: string + example: GDMQQNJM4UL7QIA66P7R2PZHMQINWZBM77BEBMHLFXD5JEUAHGJ7R4JZ + TransactionIDParam: + name: tx_id + in: path + required: true + description: The Transaction hash, it's id. + schema: + type: string + example: a221f4743450736cba4a78940f22b01e1f64568eec8cb04c2ae37874d86cee3d schemas: CollectionModelItem: type: object diff --git a/exp/lighthorizon/actions/transaction.go b/exp/lighthorizon/actions/transaction.go deleted file mode 100644 index 16a9654a3a..0000000000 --- a/exp/lighthorizon/actions/transaction.go +++ /dev/null @@ -1,145 +0,0 @@ -package actions - -import ( - "encoding/hex" - "fmt" - "io" - "net/http" - "strconv" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/stellar/go/support/log" - - "github.com/stellar/go/exp/lighthorizon/adapters" - "github.com/stellar/go/exp/lighthorizon/archive" - "github.com/stellar/go/exp/lighthorizon/index" - hProtocol "github.com/stellar/go/protocols/horizon" - "github.com/stellar/go/support/render/hal" - "github.com/stellar/go/toid" -) - -var ( - requestCount = promauto.NewCounter(prometheus.CounterOpts{ - Name: "horizon_lite_request_count", - Help: "How many requests have occurred?", - }) - requestTime = promauto.NewHistogram(prometheus.HistogramOpts{ - Name: "horizon_lite_request_duration", - Help: "How long do requests take?", - Buckets: append( - prometheus.LinearBuckets(0, 50, 20), - prometheus.LinearBuckets(1000, 1000, 8)..., - ), - }) -) - -func Transactions(archiveWrapper archive.Wrapper, indexStore index.Store) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - start := time.Now() - defer func() { - duration := time.Since(start) - requestTime.Observe(float64(duration.Milliseconds())) - }() - requestCount.Inc() - - // For _links rendering, imitate horizon.stellar.org links for horizon-cmp - r.URL.Scheme = "http" - r.URL.Host = "localhost:8080" - - if r.Method != "GET" { - sendErrorResponse(w, http.StatusMethodNotAllowed, "") - return - } - - paginate, err := Paging(r) - if err != nil { - sendErrorResponse(w, http.StatusBadRequest, string(InvalidPagingParameters)) - return - } - - if paginate.Cursor < 1 { - paginate.Cursor = toid.New(1, 1, 1).ToInt64() - } - - if paginate.Limit < 1 { - paginate.Limit = 10 - } - - page := hal.Page{ - Cursor: strconv.FormatInt(paginate.Cursor, 10), - Order: string(paginate.Order), - Limit: uint64(paginate.Limit), - } - page.Init() - page.FullURL = r.URL - - // For now, use a query param for now to avoid dragging in chi-router. Not - // really the point of the experiment yet. - txId, err := RequestUnaryParam(r, "id") - if err != nil { - log.Error(err) - sendErrorResponse(w, http.StatusInternalServerError, "") - return - } - - if txId != "" { - // if 'id' is on request, it overrides any paging parameters on the request. - var b []byte - b, err = hex.DecodeString(txId) - if err != nil { - sendErrorResponse(w, http.StatusBadRequest, - "Invalid transaction id request parameter, not valid hex encoding") - return - } - if len(b) != 32 { - sendErrorResponse(w, http.StatusBadRequest, - "Invalid transaction id request parameter, the encoded hex value must decode to length of 32 bytes") - return - } - var hash [32]byte - copy(hash[:], b) - - var foundTOID int64 - foundTOID, err = indexStore.TransactionTOID(hash) - if err == io.EOF { - log.Error(err) - sendErrorResponse(w, http.StatusNotFound, - fmt.Sprintf("Transaction with ID %x does not exist", hash)) - return - } else if err != nil { - log.Error(err) - sendErrorResponse(w, http.StatusInternalServerError, "") - return - } - - paginate.Cursor = foundTOID - paginate.Limit = 1 - } - - //TODO - implement paginate.Order(asc/desc) - txns, err := archiveWrapper.GetTransactions(r.Context(), paginate.Cursor, paginate.Limit) - if err != nil { - log.Error(err) - sendErrorResponse(w, http.StatusInternalServerError, "") - return - } - - for _, txn := range txns { - var response hProtocol.Transaction - txn.NetworkPassphrase = archiveWrapper.Passphrase - response, err = adapters.PopulateTransaction(r, &txn) - if err != nil { - log.Error(err) - sendErrorResponse(w, http.StatusInternalServerError, "") - return - } - - page.Add(response) - } - - page.PopulateLinks() - sendPageResponse(w, page) - } -} diff --git a/exp/lighthorizon/archive/ingest_archive.go b/exp/lighthorizon/archive/ingest_archive.go index de26414de0..01ac0b6158 100644 --- a/exp/lighthorizon/archive/ingest_archive.go +++ b/exp/lighthorizon/archive/ingest_archive.go @@ -3,6 +3,7 @@ package archive import ( "context" + "github.com/stellar/go/exp/lighthorizon/index" "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" @@ -15,7 +16,7 @@ type ingestArchive struct { *ledgerbackend.HistoryArchiveBackend } -func (ingestArchive) NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (LedgerTransactionReader, error) { +func (a ingestArchive) NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (LedgerTransactionReader, error) { ingestReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase, ledgerCloseMeta) if err != nil { @@ -25,6 +26,42 @@ func (ingestArchive) NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassph return &ingestTransactionReaderAdaption{ingestReader}, nil } +func (a ingestArchive) GetTransactionParticipants(transaction LedgerTransaction) (map[string]struct{}, error) { + participants, err := index.GetTransactionParticipants(a.ingestTx(transaction)) + if err != nil { + return nil, err + } + set := make(map[string]struct{}) + exists := struct{}{} + for _, participant := range participants { + set[participant] = exists + } + return set, nil +} + +func (a ingestArchive) GetOperationParticipants(transaction LedgerTransaction, operation xdr.Operation, opIndex int) (map[string]struct{}, error) { + participants, err := index.GetOperationParticipants(a.ingestTx(transaction), operation, opIndex) + if err != nil { + return nil, err + } + set := make(map[string]struct{}) + exists := struct{}{} + for _, participant := range participants { + set[participant] = exists + } + return set, nil +} + +func (ingestArchive) ingestTx(transaction LedgerTransaction) ingest.LedgerTransaction { + tx := ingest.LedgerTransaction{} + tx.Index = transaction.Index + tx.Envelope = transaction.Envelope + tx.Result = transaction.Result + tx.FeeChanges = transaction.FeeChanges + tx.UnsafeMeta = transaction.UnsafeMeta + return tx +} + type ingestTransactionReaderAdaption struct { *ingest.LedgerTransactionReader } diff --git a/exp/lighthorizon/archive/main.go b/exp/lighthorizon/archive/main.go index 03e40a4f10..1f46a68105 100644 --- a/exp/lighthorizon/archive/main.go +++ b/exp/lighthorizon/archive/main.go @@ -2,12 +2,6 @@ package archive import ( "context" - "io" - - "github.com/stellar/go/exp/lighthorizon/common" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/support/log" - "github.com/stellar/go/toid" "github.com/stellar/go/xdr" ) @@ -36,148 +30,27 @@ type LedgerTransactionReader interface { // Archive here only has the methods LightHorizon cares about, to make caching/wrapping easier type Archive interface { - GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) - Close() error - NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (LedgerTransactionReader, error) -} - -type Wrapper struct { - Archive - Passphrase string -} - -func (a *Wrapper) GetOperations(ctx context.Context, cursor int64, limit int64) ([]common.Operation, error) { - parsedID := toid.Parse(cursor) - ledgerSequence := uint32(parsedID.LedgerSequence) - if ledgerSequence < 2 { - ledgerSequence = 2 - } - - log.Debugf("Searching op %d", cursor) - log.Debugf("Getting ledgers starting at %d", ledgerSequence) - - ops := []common.Operation{} - appending := false - - for { - log.Debugf("Checking ledger %d", ledgerSequence) - ledger, err := a.GetLedger(ctx, ledgerSequence) - if err != nil { - return ops, nil - } - - reader, err := a.NewLedgerTransactionReaderFromLedgerCloseMeta(a.Passphrase, ledger) - if err != nil { - return nil, errors.Wrapf(err, "error in ledger %d", ledgerSequence) - } - - transactionOrder := int32(0) - for { - tx, err := reader.Read() - if err != nil { - if err == io.EOF { - break - } - return nil, err - } - - transactionOrder++ - for operationOrder := range tx.Envelope.Operations() { - currID := toid.New(int32(ledgerSequence), transactionOrder, int32(operationOrder+1)).ToInt64() - - if currID >= cursor { - appending = true - if currID == cursor { - continue - } - } - if appending { - ops = append(ops, common.Operation{ - TransactionEnvelope: &tx.Envelope, - TransactionResult: &tx.Result.Result, - // TODO: Use a method to get the header - LedgerHeader: &ledger.V0.LedgerHeader.Header, - OpIndex: int32(operationOrder + 1), - TxIndex: int32(transactionOrder), - }) - } - - if int64(len(ops)) == limit { - return ops, nil - } - } - } - - ledgerSequence++ - } -} - -func (a *Wrapper) GetTransactions(ctx context.Context, cursor int64, limit int64) ([]common.Transaction, error) { - parsedID := toid.Parse(cursor) - ledgerSequence := uint32(parsedID.LedgerSequence) - if ledgerSequence < 2 { - ledgerSequence = 2 - } - - log.Debugf("Searching tx %d starting at", cursor) - log.Debugf("Getting ledgers starting at %d", ledgerSequence) - - txns := []common.Transaction{} - appending := false - - for { - log.Debugf("Checking ledger %d", ledgerSequence) - ledger, err := a.GetLedger(ctx, ledgerSequence) - if err != nil { - // no 'NotFound' distinction on err, treat all as not found. - return txns, nil - } - - reader, err := a.NewLedgerTransactionReaderFromLedgerCloseMeta(a.Passphrase, ledger) - if err != nil { - return nil, err - } - - transactionOrder := int32(0) - for { - tx, err := reader.Read() - if err != nil { - if err == io.EOF { - break - } - return nil, err - } - - transactionOrder++ - currID := toid.New(int32(ledgerSequence), transactionOrder, 1).ToInt64() - - if currID >= cursor { - appending = true - if currID == cursor { - continue - } - } + // GetLedger - takes a caller context and a sequence number and returns the meta data + // for the ledger corresponding to the sequence number. If there is any error, it will + // return nil and the error. + GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) - if appending { - txns = append(txns, common.Transaction{ - TransactionEnvelope: &tx.Envelope, - TransactionResult: &tx.Result.Result, - // TODO: Use a method to get the header - LedgerHeader: &ledger.V0.LedgerHeader.Header, - TxIndex: int32(transactionOrder), - }) - } + // Close - will release any resources used for this archive instance and should be + // called at end of usage of archive. + Close() error - if int64(len(txns)) == limit { - return txns, nil - } + // NewLedgerTransactionReaderFromLedgerCloseMeta - takes the passphrase for the blockchain network + // and the LedgerCloseMeta(meta data) and returns a reader that can be used to obtain a LedgerTransaction model + // from the meta data. If there is any error, it will return nil and the error. + NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (LedgerTransactionReader, error) - if ctx.Err() != nil { - return nil, ctx.Err() - } - } + // GetTransactionParticipants - takes a LedgerTransaction and returns a set of all + // participants(accounts) in the transaction. If there is any error, it will return nil and the error. + GetTransactionParticipants(transaction LedgerTransaction) (map[string]struct{}, error) - ledgerSequence++ - } + // GetOperationParticipants - takes a LedgerTransaction, the Operation within the transaction, and + // the 0 based index of the operation within the transaction. It will return a set of all participants(accounts) + // in the operation. If there is any error, it will return nil and the error. + GetOperationParticipants(transaction LedgerTransaction, operation xdr.Operation, opIndex int) (map[string]struct{}, error) } diff --git a/exp/lighthorizon/archive/main_test.go b/exp/lighthorizon/archive/main_test.go deleted file mode 100644 index a4eb4bc2f5..0000000000 --- a/exp/lighthorizon/archive/main_test.go +++ /dev/null @@ -1,143 +0,0 @@ -package archive - -import ( - "context" - "fmt" - "io" - "testing" - - "github.com/stellar/go/xdr" - "github.com/stretchr/testify/require" -) - -func TestItGetsSequentialOperationsForLimitBeyondEnd(tt *testing.T) { - // l=1586111, t=1, o=1 - ctx := context.Background() - cursor := int64(6812294872829953) - passphrase := "Red New England clam chowder" - archiveWrapper := Wrapper{Archive: mockArchiveFixture(ctx, passphrase), Passphrase: passphrase} - ops, err := archiveWrapper.GetOperations(ctx, cursor, 5) - require.NoError(tt, err) - require.Len(tt, ops, 3) - require.Equal(tt, ops[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586111)) - require.Equal(tt, ops[0].TxIndex, int32(1)) - require.Equal(tt, ops[0].OpIndex, int32(2)) - require.Equal(tt, ops[1].LedgerHeader.LedgerSeq, xdr.Uint32(1586111)) - require.Equal(tt, ops[1].TxIndex, int32(2)) - require.Equal(tt, ops[1].OpIndex, int32(1)) - require.Equal(tt, ops[2].LedgerHeader.LedgerSeq, xdr.Uint32(1586112)) - require.Equal(tt, ops[2].TxIndex, int32(1)) - require.Equal(tt, ops[2].OpIndex, int32(1)) -} - -func TestItGetsSequentialOperationsForLimitBeforeEnd(tt *testing.T) { - // l=1586111, t=1, o=1 - ctx := context.Background() - cursor := int64(6812294872829953) - passphrase := "White New England clam chowder" - archiveWrapper := Wrapper{Archive: mockArchiveFixture(ctx, passphrase), Passphrase: passphrase} - ops, err := archiveWrapper.GetOperations(ctx, cursor, 2) - require.NoError(tt, err) - require.Len(tt, ops, 2) - require.Equal(tt, ops[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586111)) - require.Equal(tt, ops[0].TxIndex, int32(1)) - require.Equal(tt, ops[0].OpIndex, int32(2)) - require.Equal(tt, ops[1].LedgerHeader.LedgerSeq, xdr.Uint32(1586111)) - require.Equal(tt, ops[1].TxIndex, int32(2)) - require.Equal(tt, ops[1].OpIndex, int32(1)) -} - -func TestItGetsSequentialTransactionsForLimitBeyondEnd(tt *testing.T) { - // l=1586111, t=1, o=1 - ctx := context.Background() - cursor := int64(6812294872829953) - passphrase := "White New England clam chowder" - archiveWrapper := Wrapper{Archive: mockArchiveFixture(ctx, passphrase), Passphrase: passphrase} - txs, err := archiveWrapper.GetTransactions(ctx, cursor, 5) - require.NoError(tt, err) - require.Len(tt, txs, 2) - require.Equal(tt, txs[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586111)) - require.Equal(tt, txs[0].TxIndex, int32(2)) - require.Equal(tt, txs[1].LedgerHeader.LedgerSeq, xdr.Uint32(1586112)) - require.Equal(tt, txs[1].TxIndex, int32(1)) -} - -func TestItGetsSequentialTransactionsForLimitBeforeEnd(tt *testing.T) { - // l=1586111, t=1, o=1 - ctx := context.Background() - cursor := int64(6812294872829953) - passphrase := "White New England clam chowder" - archiveWrapper := Wrapper{Archive: mockArchiveFixture(ctx, passphrase), Passphrase: passphrase} - txs, err := archiveWrapper.GetTransactions(ctx, cursor, 1) - require.NoError(tt, err) - require.Len(tt, txs, 1) - require.Equal(tt, txs[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586111)) - require.Equal(tt, txs[0].TxIndex, int32(2)) -} - -func mockArchiveFixture(ctx context.Context, passphrase string) *MockArchive { - mockArchive := &MockArchive{} - mockReaderLedger1 := &MockLedgerTransactionReader{} - mockReaderLedger2 := &MockLedgerTransactionReader{} - - expectedLedger1 := testLedger(1586111) - expectedLedger2 := testLedger(1586112) - source := xdr.MustAddress("GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU") - // assert results iterate sequentially across ops-tx-ledgers - expectedLedger1Transaction1 := testLedgerTx(source, 34, 34) - expectedLedger1Transaction2 := testLedgerTx(source, 34) - expectedLedger2Transaction1 := testLedgerTx(source, 34) - - mockArchive.On("GetLedger", ctx, uint32(1586111)).Return(expectedLedger1, nil) - mockArchive.On("GetLedger", ctx, uint32(1586112)).Return(expectedLedger2, nil) - mockArchive.On("GetLedger", ctx, uint32(1586113)).Return(xdr.LedgerCloseMeta{}, fmt.Errorf("ledger not found")) - mockArchive.On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, expectedLedger1).Return(mockReaderLedger1, nil) - mockArchive.On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, expectedLedger2).Return(mockReaderLedger2, nil) - mockReaderLedger1.On("Read").Return(expectedLedger1Transaction1, nil).Once() - mockReaderLedger1.On("Read").Return(expectedLedger1Transaction2, nil).Once() - mockReaderLedger1.On("Read").Return(LedgerTransaction{}, io.EOF).Once() - mockReaderLedger2.On("Read").Return(expectedLedger2Transaction1, nil).Once() - mockReaderLedger2.On("Read").Return(LedgerTransaction{}, io.EOF).Once() - return mockArchive -} - -func testLedger(seq int) xdr.LedgerCloseMeta { - return xdr.LedgerCloseMeta{ - V0: &xdr.LedgerCloseMetaV0{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - LedgerSeq: xdr.Uint32(seq), - }, - }, - }, - } -} - -func testLedgerTx(source xdr.AccountId, bumpTos ...int) LedgerTransaction { - - ops := []xdr.Operation{} - for _, bumpTo := range bumpTos { - ops = append(ops, xdr.Operation{ - Body: xdr.OperationBody{ - BumpSequenceOp: &xdr.BumpSequenceOp{ - BumpTo: xdr.SequenceNumber(bumpTo), - }, - }, - }) - } - - tx := LedgerTransaction{ - Envelope: xdr.TransactionEnvelope{ - Type: xdr.EnvelopeTypeEnvelopeTypeTx, - V1: &xdr.TransactionV1Envelope{ - Tx: xdr.Transaction{ - SourceAccount: source.ToMuxedAccount(), - Fee: xdr.Uint32(1), - Operations: ops, - }, - }, - }, - } - - return tx -} diff --git a/exp/lighthorizon/archive/mock_archive.go b/exp/lighthorizon/archive/mock_archive.go index bdfd6b9149..b40076bc91 100644 --- a/exp/lighthorizon/archive/mock_archive.go +++ b/exp/lighthorizon/archive/mock_archive.go @@ -34,3 +34,13 @@ func (m *MockArchive) NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassp args := m.Called(networkPassphrase, ledgerCloseMeta) return args.Get(0).(LedgerTransactionReader), args.Error(1) } + +func (m *MockArchive) GetTransactionParticipants(transaction LedgerTransaction) (map[string]struct{}, error) { + args := m.Called(transaction) + return args.Get(0).(map[string]struct{}), args.Error(1) +} + +func (m *MockArchive) GetOperationParticipants(transaction LedgerTransaction, operation xdr.Operation, opIndex int) (map[string]struct{}, error) { + args := m.Called(transaction, operation, opIndex) + return args.Get(0).(map[string]struct{}), args.Error(1) +} diff --git a/exp/lighthorizon/index/cmd/batch/reduce/main.go b/exp/lighthorizon/index/cmd/batch/reduce/main.go index 20ce0ac877..5b1bfa8dae 100644 --- a/exp/lighthorizon/index/cmd/batch/reduce/main.go +++ b/exp/lighthorizon/index/cmd/batch/reduce/main.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "strconv" + "strings" "sync" "github.com/stellar/go/exp/lighthorizon/index" @@ -41,20 +42,20 @@ func ReduceConfigFromEnvironment() (*ReduceConfig, error) { indexTargetEnv = "INDEX_TARGET" ) - jobIndex, err := strconv.ParseUint(os.Getenv(jobIndexEnv), 10, 32) + jobIndex, err := strconv.ParseUint(strings.TrimSpace(os.Getenv(jobIndexEnv)), 10, 32) if err != nil { return nil, errors.Wrap(err, "invalid parameter "+jobIndexEnv) } - mapJobCount, err := strconv.ParseUint(os.Getenv(mapJobsEnv), 10, 32) + mapJobCount, err := strconv.ParseUint(strings.TrimSpace(os.Getenv(mapJobsEnv)), 10, 32) if err != nil { return nil, errors.Wrap(err, "invalid parameter "+mapJobsEnv) } - reduceJobCount, err := strconv.ParseUint(os.Getenv(reduceJobsEnv), 10, 32) + reduceJobCount, err := strconv.ParseUint(strings.TrimSpace(os.Getenv(reduceJobsEnv)), 10, 32) if err != nil { return nil, errors.Wrap(err, "invalid parameter "+reduceJobsEnv) } - workersStr := os.Getenv(workerCountEnv) + workersStr := strings.TrimSpace(os.Getenv(workerCountEnv)) if workersStr == "" { workersStr = strconv.FormatUint(DEFAULT_WORKER_COUNT, 10) } @@ -63,12 +64,12 @@ func ReduceConfigFromEnvironment() (*ReduceConfig, error) { return nil, errors.Wrap(err, "invalid parameter "+workerCountEnv) } - indexTarget := os.Getenv(indexTargetEnv) + indexTarget := strings.TrimSpace(os.Getenv(indexTargetEnv)) if indexTarget == "" { return nil, errors.New("required parameter missing " + indexTargetEnv) } - indexRootSource := os.Getenv(indexRootSourceEnv) + indexRootSource := strings.TrimSpace(os.Getenv(indexRootSourceEnv)) if indexRootSource == "" { return nil, errors.New("required parameter missing " + indexRootSourceEnv) } diff --git a/exp/lighthorizon/index/cmd/single_test.go b/exp/lighthorizon/index/cmd/single_test.go index 5dd0de143e..d275458d3d 100644 --- a/exp/lighthorizon/index/cmd/single_test.go +++ b/exp/lighthorizon/index/cmd/single_test.go @@ -212,7 +212,7 @@ func IndexLedgerRange( } require.NoError(t, err) - participants, err := index.GetParticipants(tx) + participants, err := index.GetTransactionParticipants(tx) require.NoError(t, err) for _, participant := range participants { diff --git a/exp/lighthorizon/index/mock_store.go b/exp/lighthorizon/index/mock_store.go new file mode 100644 index 0000000000..db0e53e1cc --- /dev/null +++ b/exp/lighthorizon/index/mock_store.go @@ -0,0 +1,78 @@ +package index + +import ( + "github.com/prometheus/client_golang/prometheus" + types "github.com/stellar/go/exp/lighthorizon/index/types" + "github.com/stretchr/testify/mock" +) + +type MockStore struct { + mock.Mock +} + +func (m *MockStore) NextActive(account, index string, afterCheckpoint uint32) (uint32, error) { + args := m.Called(account, index, afterCheckpoint) + return args.Get(0).(uint32), args.Error(1) +} + +func (m *MockStore) TransactionTOID(hash [32]byte) (int64, error) { + args := m.Called(hash) + return args.Get(0).(int64), args.Error(1) +} + +func (m *MockStore) AddTransactionToIndexes(txnTOID int64, hash [32]byte) error { + args := m.Called(txnTOID, hash) + return args.Error(0) +} + +func (m *MockStore) AddParticipantsToIndexes(checkpoint uint32, index string, participants []string) error { + args := m.Called(checkpoint, index, participants) + return args.Error(0) +} + +func (m *MockStore) AddParticipantsToIndexesNoBackend(checkpoint uint32, index string, participants []string) error { + args := m.Called(checkpoint, index, participants) + return args.Error(0) +} + +func (m *MockStore) AddParticipantToIndexesNoBackend(participant string, indexes types.NamedIndices) { + m.Called(participant, indexes) +} + +func (m *MockStore) Flush() error { + args := m.Called() + return args.Error(0) +} + +func (m *MockStore) FlushAccounts() error { + args := m.Called() + return args.Error(0) +} + +func (m *MockStore) ClearMemory(arg bool) { + m.Called(arg) +} + +func (m *MockStore) Read(account string) (types.NamedIndices, error) { + args := m.Called(account) + return args.Get(0).(types.NamedIndices), args.Error(1) +} + +func (m *MockStore) ReadAccounts() ([]string, error) { + args := m.Called() + return args.Get(0).([]string), args.Error(1) +} + +func (m *MockStore) ReadTransactions(prefix string) (*types.TrieIndex, error) { + args := m.Called(prefix) + return args.Get(0).(*types.TrieIndex), args.Error(1) +} + +func (m *MockStore) MergeTransactions(prefix string, other *types.TrieIndex) error { + args := m.Called(prefix, other) + return args.Error(0) +} + +func (m *MockStore) RegisterMetrics(registry *prometheus.Registry) { + m.Called(registry) +} diff --git a/exp/lighthorizon/index/modules.go b/exp/lighthorizon/index/modules.go index 295f1a23e9..9250125ab9 100644 --- a/exp/lighthorizon/index/modules.go +++ b/exp/lighthorizon/index/modules.go @@ -25,7 +25,7 @@ func ProcessAccounts( tx ingest.LedgerTransaction, ) error { checkpoint := (ledger.LedgerSequence() / 64) + 1 - allParticipants, err := GetParticipants(tx) + allParticipants, err := GetTransactionParticipants(tx) if err != nil { return err } @@ -66,7 +66,7 @@ func ProcessAccountsWithoutBackend( tx ingest.LedgerTransaction, ) error { checkpoint := (ledger.LedgerSequence() / 64) + 1 - allParticipants, err := GetParticipants(tx) + allParticipants, err := GetTransactionParticipants(tx) if err != nil { return err } @@ -105,133 +105,143 @@ func GetPaymentParticipants(transaction ingest.LedgerTransaction) ([]string, err return participantsForOperations(transaction, true) } -func GetParticipants(transaction ingest.LedgerTransaction) ([]string, error) { +func GetTransactionParticipants(transaction ingest.LedgerTransaction) ([]string, error) { return participantsForOperations(transaction, false) } +// transaction - the ledger transaction +// operation - the operation within this transaction +// opIndex - the 0 based index of the operation within the transaction +func GetOperationParticipants(transaction ingest.LedgerTransaction, operation xdr.Operation, opIndex int) ([]string, error) { + return participantsForOperation(transaction, operation, opIndex, false) +} + func participantsForOperations(transaction ingest.LedgerTransaction, onlyPayments bool) ([]string, error) { var participants []string for opindex, operation := range transaction.Envelope.Operations() { - opSource := operation.SourceAccount - if opSource == nil { - txSource := transaction.Envelope.SourceAccount() - opSource = &txSource + opParticipants, err := participantsForOperation(transaction, operation, opindex, onlyPayments) + if err != nil { + return []string{}, err } + participants = append(participants, opParticipants...) + } - switch operation.Body.Type { - case xdr.OperationTypeCreateAccount, - xdr.OperationTypePayment, - xdr.OperationTypePathPaymentStrictReceive, - xdr.OperationTypePathPaymentStrictSend, - xdr.OperationTypeAccountMerge: - participants = append(participants, opSource.Address()) - - default: - if onlyPayments { - continue - } - participants = append(participants, opSource.Address()) + // FIXME: Can/Should we make this a set? It may mean less superfluous + // insertions into the index if there's a lot of activity by this + // account in this transaction. + return participants, nil +} + +// transaction - the ledger transaction +// operation - the operation within this transaction +// opIndex - the 0 based index of the operation within the transaction +func participantsForOperation(transaction ingest.LedgerTransaction, operation xdr.Operation, opIndex int, onlyPayments bool) ([]string, error) { + participants := []string{} + opSource := operation.SourceAccount + if opSource == nil { + txSource := transaction.Envelope.SourceAccount() + opSource = &txSource + } + switch operation.Body.Type { + case xdr.OperationTypeCreateAccount, + xdr.OperationTypePayment, + xdr.OperationTypePathPaymentStrictReceive, + xdr.OperationTypePathPaymentStrictSend, + xdr.OperationTypeAccountMerge: + participants = append(participants, opSource.Address()) + + default: + if onlyPayments { + return participants, nil } + participants = append(participants, opSource.Address()) + } - switch operation.Body.Type { - case xdr.OperationTypeCreateAccount: - participants = append(participants, operation.Body.MustCreateAccountOp().Destination.Address()) + switch operation.Body.Type { + case xdr.OperationTypeCreateAccount: + participants = append(participants, operation.Body.MustCreateAccountOp().Destination.Address()) - case xdr.OperationTypePayment: - participants = append(participants, operation.Body.MustPaymentOp().Destination.ToAccountId().Address()) + case xdr.OperationTypePayment: + participants = append(participants, operation.Body.MustPaymentOp().Destination.ToAccountId().Address()) - case xdr.OperationTypePathPaymentStrictReceive: - participants = append(participants, operation.Body.MustPathPaymentStrictReceiveOp().Destination.ToAccountId().Address()) + case xdr.OperationTypePathPaymentStrictReceive: + participants = append(participants, operation.Body.MustPathPaymentStrictReceiveOp().Destination.ToAccountId().Address()) - case xdr.OperationTypePathPaymentStrictSend: - participants = append(participants, operation.Body.MustPathPaymentStrictSendOp().Destination.ToAccountId().Address()) + case xdr.OperationTypePathPaymentStrictSend: + participants = append(participants, operation.Body.MustPathPaymentStrictSendOp().Destination.ToAccountId().Address()) - case xdr.OperationTypeAllowTrust: - participants = append(participants, operation.Body.MustAllowTrustOp().Trustor.Address()) + case xdr.OperationTypeAllowTrust: + participants = append(participants, operation.Body.MustAllowTrustOp().Trustor.Address()) - case xdr.OperationTypeAccountMerge: - participants = append(participants, operation.Body.MustDestination().ToAccountId().Address()) + case xdr.OperationTypeAccountMerge: + participants = append(participants, operation.Body.MustDestination().ToAccountId().Address()) - case xdr.OperationTypeCreateClaimableBalance: - for _, c := range operation.Body.MustCreateClaimableBalanceOp().Claimants { - participants = append(participants, c.MustV0().Destination.Address()) - } + case xdr.OperationTypeCreateClaimableBalance: + for _, c := range operation.Body.MustCreateClaimableBalanceOp().Claimants { + participants = append(participants, c.MustV0().Destination.Address()) + } - case xdr.OperationTypeBeginSponsoringFutureReserves: - participants = append(participants, operation.Body.MustBeginSponsoringFutureReservesOp().SponsoredId.Address()) - - case xdr.OperationTypeEndSponsoringFutureReserves: - // Failed transactions may not have a compliant sandwich structure - // we can rely on (e.g. invalid nesting or a being operation with - // the wrong sponsoree ID) and thus we bail out since we could - // return incorrect information. - if transaction.Result.Successful() { - sponsoree := transaction.Envelope.SourceAccount().ToAccountId().Address() - if operation.SourceAccount != nil { - sponsoree = operation.SourceAccount.Address() - } - operations := transaction.Envelope.Operations() - for i := int(opindex) - 1; i >= 0; i-- { - if beginOp, ok := operations[i].Body.GetBeginSponsoringFutureReservesOp(); ok && - beginOp.SponsoredId.Address() == sponsoree { - participants = append(participants, beginOp.SponsoredId.Address()) - } - } + case xdr.OperationTypeBeginSponsoringFutureReserves: + participants = append(participants, operation.Body.MustBeginSponsoringFutureReservesOp().SponsoredId.Address()) + + case xdr.OperationTypeEndSponsoringFutureReserves: + // Failed transactions may not have a compliant sandwich structure + // we can rely on (e.g. invalid nesting or a being operation with + // the wrong sponsoree ID) and thus we bail out since we could + // return incorrect information. + if transaction.Result.Successful() { + sponsoree := transaction.Envelope.SourceAccount().ToAccountId().Address() + if operation.SourceAccount != nil { + sponsoree = operation.SourceAccount.Address() } - - case xdr.OperationTypeRevokeSponsorship: - op := operation.Body.MustRevokeSponsorshipOp() - switch op.Type { - case xdr.RevokeSponsorshipTypeRevokeSponsorshipLedgerEntry: - participants = append(participants, getLedgerKeyParticipants(*op.LedgerKey)...) - - case xdr.RevokeSponsorshipTypeRevokeSponsorshipSigner: - participants = append(participants, op.Signer.AccountId.Address()) - // We don't add signer as a participant because a signer can be - // arbitrary account. This can spam successful operations - // history of any account. + operations := transaction.Envelope.Operations() + for i := opIndex - 1; i >= 0; i-- { + if beginOp, ok := operations[i].Body.GetBeginSponsoringFutureReservesOp(); ok && + beginOp.SponsoredId.Address() == sponsoree { + participants = append(participants, beginOp.SponsoredId.Address()) + } } + } - case xdr.OperationTypeClawback: - op := operation.Body.MustClawbackOp() - participants = append(participants, op.From.ToAccountId().Address()) - - case xdr.OperationTypeSetTrustLineFlags: - op := operation.Body.MustSetTrustLineFlagsOp() - participants = append(participants, op.Trustor.Address()) - - // for the following, the only direct participant is the source_account - case xdr.OperationTypeManageBuyOffer: - case xdr.OperationTypeManageSellOffer: - case xdr.OperationTypeCreatePassiveSellOffer: - case xdr.OperationTypeSetOptions: - case xdr.OperationTypeChangeTrust: - case xdr.OperationTypeInflation: - case xdr.OperationTypeManageData: - case xdr.OperationTypeBumpSequence: - case xdr.OperationTypeClaimClaimableBalance: - case xdr.OperationTypeClawbackClaimableBalance: - case xdr.OperationTypeLiquidityPoolDeposit: - case xdr.OperationTypeLiquidityPoolWithdraw: - - default: - return nil, fmt.Errorf("unknown operation type: %s", operation.Body.Type) + case xdr.OperationTypeRevokeSponsorship: + op := operation.Body.MustRevokeSponsorshipOp() + switch op.Type { + case xdr.RevokeSponsorshipTypeRevokeSponsorshipLedgerEntry: + participants = append(participants, getLedgerKeyParticipants(*op.LedgerKey)...) + + case xdr.RevokeSponsorshipTypeRevokeSponsorshipSigner: + participants = append(participants, op.Signer.AccountId.Address()) + // We don't add signer as a participant because a signer can be + // arbitrary account. This can spam successful operations + // history of any account. } - // Requires meta - // sponsor, err := operation.getSponsor() - // if err != nil { - // return nil, err - // } - // if sponsor != nil { - // otherParticipants = append(otherParticipants, *sponsor) - // } + case xdr.OperationTypeClawback: + op := operation.Body.MustClawbackOp() + participants = append(participants, op.From.ToAccountId().Address()) + + case xdr.OperationTypeSetTrustLineFlags: + op := operation.Body.MustSetTrustLineFlagsOp() + participants = append(participants, op.Trustor.Address()) + + // for the following, the only direct participant is the source_account + case xdr.OperationTypeManageBuyOffer: + case xdr.OperationTypeManageSellOffer: + case xdr.OperationTypeCreatePassiveSellOffer: + case xdr.OperationTypeSetOptions: + case xdr.OperationTypeChangeTrust: + case xdr.OperationTypeInflation: + case xdr.OperationTypeManageData: + case xdr.OperationTypeBumpSequence: + case xdr.OperationTypeClaimClaimableBalance: + case xdr.OperationTypeClawbackClaimableBalance: + case xdr.OperationTypeLiquidityPoolDeposit: + case xdr.OperationTypeLiquidityPoolWithdraw: + + default: + return nil, fmt.Errorf("unknown operation type: %s", operation.Body.Type) } - - // FIXME: Can/Should we make this a set? It may mean less superfluous - // insertions into the index if there's a lot of activity by this - // account in this transaction. return participants, nil } diff --git a/exp/lighthorizon/main.go b/exp/lighthorizon/main.go index 0e5870f8d9..604aa49e81 100644 --- a/exp/lighthorizon/main.go +++ b/exp/lighthorizon/main.go @@ -4,12 +4,14 @@ import ( "flag" "net/http" + "github.com/go-chi/chi" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/stellar/go/exp/lighthorizon/actions" "github.com/stellar/go/exp/lighthorizon/archive" "github.com/stellar/go/exp/lighthorizon/index" + "github.com/stellar/go/exp/lighthorizon/services" "github.com/stellar/go/network" "github.com/stellar/go/support/log" @@ -41,13 +43,29 @@ func main() { } defer ingestArchive.Close() - archiveWrapper := archive.Wrapper{Archive: ingestArchive, Passphrase: *networkPassphrase} + Config := services.Config{ + Archive: ingestArchive, + Passphrase: *networkPassphrase, + IndexStore: indexStore, + } + + lightHorizon := services.LightHorizon{ + Transactions: services.TransactionsService{ + Config: Config, + }, + Operations: services.OperationsService{ + Config: Config, + }, + } - http.HandleFunc("/", actions.ApiDocs()) - http.HandleFunc("/operations", actions.Operations(archiveWrapper, indexStore)) - http.HandleFunc("/transactions", actions.Transactions(archiveWrapper, indexStore)) + router := chi.NewMux() + router.Route("/accounts/{account_id}", func(r chi.Router) { + r.MethodFunc(http.MethodGet, "/transactions", actions.NewTXByAccountHandler(lightHorizon)) + r.MethodFunc(http.MethodGet, "/operations", actions.NewOpsByAccountHandler(lightHorizon)) + }) - http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) + router.MethodFunc(http.MethodGet, "/", actions.ApiDocs()) + router.Method(http.MethodGet, "/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) - log.Fatal(http.ListenAndServe(":8080", nil)) + log.Fatal(http.ListenAndServe(":8080", router)) } diff --git a/exp/lighthorizon/services/main.go b/exp/lighthorizon/services/main.go new file mode 100644 index 0000000000..7d5a4bcefb --- /dev/null +++ b/exp/lighthorizon/services/main.go @@ -0,0 +1,193 @@ +package services + +import ( + "context" + "io" + + "github.com/stellar/go/exp/lighthorizon/archive" + "github.com/stellar/go/exp/lighthorizon/common" + "github.com/stellar/go/exp/lighthorizon/index" + "github.com/stellar/go/historyarchive" + "github.com/stellar/go/toid" + "github.com/stellar/go/xdr" + + "github.com/stellar/go/support/errors" + "github.com/stellar/go/support/log" +) + +const ( + allIndexes = "all/all" +) + +var ( + checkpointManager = historyarchive.NewCheckpointManager(0) +) + +type LightHorizon struct { + Operations OperationsService + Transactions TransactionsService +} + +type Config struct { + Archive archive.Archive + IndexStore index.Store + Passphrase string +} + +type TransactionsService struct { + TransactionRepository + Config Config +} + +type OperationsService struct { + OperationsRepository, + Config Config +} + +type OperationsRepository interface { + GetOperationsByAccount(ctx context.Context, cursor int64, limit uint64, accountId string) ([]common.Operation, error) +} + +type TransactionRepository interface { + GetTransactionsByAccount(ctx context.Context, cursor int64, limit uint64, accountId string) ([]common.Transaction, error) +} + +type searchCallback func(archive.LedgerTransaction, *xdr.LedgerHeader) (finished bool, err error) + +func (os *OperationsService) GetOperationsByAccount(ctx context.Context, cursor int64, limit uint64, accountId string) ([]common.Operation, error) { + ops := []common.Operation{} + opsCallback := func(tx archive.LedgerTransaction, ledgerHeader *xdr.LedgerHeader) (bool, error) { + for operationOrder, op := range tx.Envelope.Operations() { + opParticipants, opParticipantErr := os.Config.Archive.GetOperationParticipants(tx, op, operationOrder) + if opParticipantErr != nil { + return false, opParticipantErr + } + if _, foundInOp := opParticipants[accountId]; foundInOp { + ops = append(ops, common.Operation{ + TransactionEnvelope: &tx.Envelope, + TransactionResult: &tx.Result.Result, + LedgerHeader: ledgerHeader, + TxIndex: int32(tx.Index), + OpIndex: int32(operationOrder), + }) + if uint64(len(ops)) == limit { + return true, nil + } + } + } + return false, nil + } + + if err := searchTxByAccount(ctx, cursor, accountId, os.Config, opsCallback); err != nil { + return nil, err + } + + return ops, nil +} + +func (ts *TransactionsService) GetTransactionsByAccount(ctx context.Context, cursor int64, limit uint64, accountId string) ([]common.Transaction, error) { + txs := []common.Transaction{} + + txsCallback := func(tx archive.LedgerTransaction, ledgerHeader *xdr.LedgerHeader) (bool, error) { + txs = append(txs, common.Transaction{ + TransactionEnvelope: &tx.Envelope, + TransactionResult: &tx.Result.Result, + LedgerHeader: ledgerHeader, + TxIndex: int32(tx.Index), + NetworkPassphrase: ts.Config.Passphrase, + }) + return (uint64(len(txs)) >= limit), nil + } + + if err := searchTxByAccount(ctx, cursor, accountId, ts.Config, txsCallback); err != nil { + return nil, err + } + return txs, nil +} + +func searchTxByAccount(ctx context.Context, cursor int64, accountId string, config Config, callback searchCallback) error { + nextLedger, err := getAccountNextLedgerCursor(accountId, cursor, config.IndexStore, allIndexes) + if err == io.EOF { + return nil + } else if err != nil { + return err + } + log.Debugf("Searching index by account %v starting at cursor %v", accountId, nextLedger) + + for { + ledger, ledgerErr := config.Archive.GetLedger(ctx, uint32(nextLedger)) + if ledgerErr != nil { + return errors.Wrapf(ledgerErr, "ledger export state is out of sync, missing ledger %v from checkpoint %v", nextLedger, getIndexCheckpointCounter(uint32(nextLedger))) + } + + reader, readerErr := config.Archive.NewLedgerTransactionReaderFromLedgerCloseMeta(config.Passphrase, ledger) + if readerErr != nil { + return readerErr + } + + for { + tx, readErr := reader.Read() + if readErr != nil { + if readErr == io.EOF { + break + } + return readErr + } + + participants, participantErr := config.Archive.GetTransactionParticipants(tx) + if participantErr != nil { + return participantErr + } + + if _, found := participants[accountId]; found { + if finished, callBackErr := callback(tx, &ledger.V0.LedgerHeader.Header); callBackErr != nil { + return callBackErr + } else if finished { + return nil + } + } + + if ctx.Err() != nil { + return ctx.Err() + } + } + nextCursor := toid.New(int32(nextLedger), 1, 1).ToInt64() + nextLedger, err = getAccountNextLedgerCursor(accountId, nextCursor, config.IndexStore, allIndexes) + if err == io.EOF { + return nil + } else if err != nil { + return err + } + } +} + +// this deals in ledgers but adapts to the index model, which is currently keyed by checkpoint for now +func getAccountNextLedgerCursor(accountId string, cursor int64, store index.Store, indexName string) (uint64, error) { + nextLedger := uint32(toid.Parse(cursor).LedgerSequence + 1) + + // done for performance reasons, skip reading the index for any requested ledger cursors + // only need to read the index when next cursor falls on checkpoint boundary + if !checkpointManager.IsCheckpoint(nextLedger) { + return uint64(nextLedger), nil + } + + // the 'NextActive' index query takes a starting checkpoint, from which the index is scanned AFTER that checkpoint, non-inclusive + // use the the currrent checkpoint as the starting point since it represents up to the cursor's ledger + queryStartingCheckpoint := getIndexCheckpointCounter(nextLedger) + indexNextCheckpoint, err := store.NextActive(accountId, indexName, queryStartingCheckpoint) + + if err != nil { + return 0, err + } + + // return the first ledger of the next index checkpoint that had account activity after cursor + // so we need to go back 64 ledgers(one checkpoint's worth) relative to next index checkpoint + // to get first ledger, since checkpoint ledgers are the last/greatest ledger in the checkpoint range + return uint64((indexNextCheckpoint - 1) * checkpointManager.GetCheckpointFrequency()), nil +} + +// derives what checkpoint this ledger would be in the index +func getIndexCheckpointCounter(ledger uint32) uint32 { + return (checkpointManager.GetCheckpoint(uint32(ledger)) / + checkpointManager.GetCheckpointFrequency()) + 1 +} diff --git a/exp/lighthorizon/services/main_test.go b/exp/lighthorizon/services/main_test.go new file mode 100644 index 0000000000..ab472f2883 --- /dev/null +++ b/exp/lighthorizon/services/main_test.go @@ -0,0 +1,226 @@ +package services + +import ( + "context" + "io" + "testing" + + "github.com/stellar/go/exp/lighthorizon/archive" + "github.com/stellar/go/exp/lighthorizon/index" + "github.com/stellar/go/xdr" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestItGetsTransactionsByAccount(tt *testing.T) { + // l=1586045, t=1, o=1 + // cursor = 6812011404988417, checkpoint=24781 + + cursor := int64(6812011404988417) + ctx := context.Background() + passphrase := "White New England clam chowder" + archive, store := mockArchiveAndIndex(ctx, passphrase) + txService := TransactionsService{ + Config: Config{ + Archive: archive, + IndexStore: store, + Passphrase: passphrase, + }, + } + accountId := "GDCXSQPVE45DVGT2ZRFFIIHSJ2EJED65W6AELGWIDRMPMWNXCEBJ4FKX" + // this should start at next checkpoint + txs, err := txService.GetTransactionsByAccount(ctx, cursor, 1, accountId) + require.NoError(tt, err) + require.Len(tt, txs, 1) + require.Equal(tt, txs[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586113)) + require.Equal(tt, txs[0].TxIndex, int32(2)) +} + +func TestItGetsTransactionsByAccountAndPageLimit(tt *testing.T) { + // l=1586045, t=1, o=1 + // cursor = 6812011404988417, checkpoint=24781 + + cursor := int64(6812011404988417) + ctx := context.Background() + passphrase := "White New England clam chowder" + archive, store := mockArchiveAndIndex(ctx, passphrase) + txService := TransactionsService{ + Config: Config{ + Archive: archive, + IndexStore: store, + Passphrase: passphrase, + }, + } + accountId := "GDCXSQPVE45DVGT2ZRFFIIHSJ2EJED65W6AELGWIDRMPMWNXCEBJ4FKX" + // this should start at next checkpoint + txs, err := txService.GetTransactionsByAccount(ctx, cursor, 5, accountId) + require.NoError(tt, err) + require.Len(tt, txs, 2) + require.Equal(tt, txs[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586113)) + require.Equal(tt, txs[0].TxIndex, int32(2)) + require.Equal(tt, txs[1].LedgerHeader.LedgerSeq, xdr.Uint32(1586114)) + require.Equal(tt, txs[1].TxIndex, int32(1)) +} + +func TestItGetsOperationsByAccount(tt *testing.T) { + // l=1586045, t=1, o=1 + // cursor = 6812011404988417, checkpoint=24781 + + cursor := int64(6812011404988417) + ctx := context.Background() + passphrase := "White New England clam chowder" + archive, store := mockArchiveAndIndex(ctx, passphrase) + opsService := OperationsService{ + Config: Config{ + Archive: archive, + IndexStore: store, + Passphrase: passphrase, + }, + } + accountId := "GDCXSQPVE45DVGT2ZRFFIIHSJ2EJED65W6AELGWIDRMPMWNXCEBJ4FKX" + // this should start at next checkpoint + ops, err := opsService.GetOperationsByAccount(ctx, cursor, 1, accountId) + require.NoError(tt, err) + require.Len(tt, ops, 1) + require.Equal(tt, ops[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586113)) + require.Equal(tt, ops[0].TxIndex, int32(2)) +} + +func TestItGetsOperationsByAccountAndPageLimit(tt *testing.T) { + // l=1586045, t=1, o=1 + // cursor = 6812011404988417, checkpoint=24781 + + cursor := int64(6812011404988417) + ctx := context.Background() + passphrase := "White New England clam chowder" + archive, store := mockArchiveAndIndex(ctx, passphrase) + opsService := OperationsService{ + Config: Config{ + Archive: archive, + IndexStore: store, + Passphrase: passphrase, + }, + } + accountId := "GDCXSQPVE45DVGT2ZRFFIIHSJ2EJED65W6AELGWIDRMPMWNXCEBJ4FKX" + // this should start at next checkpoint + ops, err := opsService.GetOperationsByAccount(ctx, cursor, 5, accountId) + require.NoError(tt, err) + require.Len(tt, ops, 2) + require.Equal(tt, ops[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586113)) + require.Equal(tt, ops[0].TxIndex, int32(2)) + require.Equal(tt, ops[1].LedgerHeader.LedgerSeq, xdr.Uint32(1586114)) + require.Equal(tt, ops[1].TxIndex, int32(1)) +} + +func mockArchiveAndIndex(ctx context.Context, passphrase string) (archive.Archive, index.Store) { + + mockArchive := &archive.MockArchive{} + mockReaderLedger1 := &archive.MockLedgerTransactionReader{} + mockReaderLedger2 := &archive.MockLedgerTransactionReader{} + mockReaderLedger3 := &archive.MockLedgerTransactionReader{} + mockReaderLedgerTheRest := &archive.MockLedgerTransactionReader{} + + expectedLedger1 := testLedger(1586112) + expectedLedger2 := testLedger(1586113) + expectedLedger3 := testLedger(1586114) + source := xdr.MustAddress("GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU") + source2 := xdr.MustAddress("GDCXSQPVE45DVGT2ZRFFIIHSJ2EJED65W6AELGWIDRMPMWNXCEBJ4FKX") + // assert results iterate sequentially across ops-tx-ledgers + expectedLedger1Transaction1 := testLedgerTx(source, []int{34, 34}, 1) + expectedLedger1Transaction2 := testLedgerTx(source, []int{34}, 2) + expectedLedger2Transaction1 := testLedgerTx(source, []int{34}, 1) + expectedLedger2Transaction2 := testLedgerTx(source2, []int{34}, 2) + expectedLedger3Transaction1 := testLedgerTx(source2, []int{34}, 1) + expectedLedger3Transaction2 := testLedgerTx(source, []int{34}, 2) + + mockArchive.On("GetLedger", ctx, uint32(1586112)).Return(expectedLedger1, nil) + mockArchive.On("GetLedger", ctx, uint32(1586113)).Return(expectedLedger2, nil) + mockArchive.On("GetLedger", ctx, uint32(1586114)).Return(expectedLedger3, nil) + mockArchive.On("GetLedger", ctx, mock.Anything).Return(xdr.LedgerCloseMeta{}, nil) + + mockArchive.On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, expectedLedger1).Return(mockReaderLedger1, nil) + mockArchive.On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, expectedLedger2).Return(mockReaderLedger2, nil) + mockArchive.On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, expectedLedger3).Return(mockReaderLedger3, nil) + mockArchive.On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, mock.Anything).Return(mockReaderLedgerTheRest, nil) + + partialParticipants := make(map[string]struct{}) + partialParticipants[source.Address()] = struct{}{} + allParticipants := make(map[string]struct{}) + allParticipants[source.Address()] = struct{}{} + allParticipants[source2.Address()] = struct{}{} + + mockArchive.On("GetTransactionParticipants", expectedLedger1Transaction1).Return(partialParticipants, nil) + mockArchive.On("GetTransactionParticipants", expectedLedger1Transaction2).Return(partialParticipants, nil) + mockArchive.On("GetTransactionParticipants", expectedLedger2Transaction1).Return(partialParticipants, nil) + mockArchive.On("GetTransactionParticipants", expectedLedger2Transaction2).Return(allParticipants, nil) + mockArchive.On("GetTransactionParticipants", expectedLedger3Transaction1).Return(allParticipants, nil) + mockArchive.On("GetTransactionParticipants", expectedLedger3Transaction2).Return(partialParticipants, nil) + + mockArchive.On("GetOperationParticipants", expectedLedger1Transaction1, mock.Anything, int(0)).Return(partialParticipants, nil) + mockArchive.On("GetOperationParticipants", expectedLedger1Transaction1, mock.Anything, int(1)).Return(partialParticipants, nil) + mockArchive.On("GetOperationParticipants", expectedLedger1Transaction2, mock.Anything, int(0)).Return(partialParticipants, nil) + mockArchive.On("GetOperationParticipants", expectedLedger2Transaction1, mock.Anything, int(0)).Return(partialParticipants, nil) + mockArchive.On("GetOperationParticipants", expectedLedger2Transaction2, mock.Anything, int(0)).Return(allParticipants, nil) + mockArchive.On("GetOperationParticipants", expectedLedger3Transaction1, mock.Anything, int(0)).Return(allParticipants, nil) + mockArchive.On("GetOperationParticipants", expectedLedger3Transaction2, mock.Anything, int(0)).Return(partialParticipants, nil) + + mockReaderLedger1.On("Read").Return(expectedLedger1Transaction1, nil).Once() + mockReaderLedger1.On("Read").Return(expectedLedger1Transaction2, nil).Once() + mockReaderLedger1.On("Read").Return(archive.LedgerTransaction{}, io.EOF).Once() + mockReaderLedger2.On("Read").Return(expectedLedger2Transaction1, nil).Once() + mockReaderLedger2.On("Read").Return(expectedLedger2Transaction2, nil).Once() + mockReaderLedger2.On("Read").Return(archive.LedgerTransaction{}, io.EOF).Once() + mockReaderLedger3.On("Read").Return(expectedLedger3Transaction1, nil).Once() + mockReaderLedger3.On("Read").Return(expectedLedger3Transaction2, nil).Once() + mockReaderLedger3.On("Read").Return(archive.LedgerTransaction{}, io.EOF).Once() + mockReaderLedgerTheRest.On("Read").Return(archive.LedgerTransaction{}, io.EOF) + + mockStore := &index.MockStore{} + mockStore.On("NextActive", "GDCXSQPVE45DVGT2ZRFFIIHSJ2EJED65W6AELGWIDRMPMWNXCEBJ4FKX", mock.Anything, uint32(24782)).Return(uint32(24783), nil) + mockStore.On("NextActive", "GDCXSQPVE45DVGT2ZRFFIIHSJ2EJED65W6AELGWIDRMPMWNXCEBJ4FKX", mock.Anything, uint32(24783)).Return(uint32(24784), nil) + mockStore.On("NextActive", "GDCXSQPVE45DVGT2ZRFFIIHSJ2EJED65W6AELGWIDRMPMWNXCEBJ4FKX", mock.Anything, uint32(24784)).Return(uint32(0), io.EOF) + + return mockArchive, mockStore +} + +func testLedger(seq int) xdr.LedgerCloseMeta { + return xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(seq), + }, + }, + }, + } +} + +func testLedgerTx(source xdr.AccountId, bumpTos []int, txIndex uint32) archive.LedgerTransaction { + + ops := []xdr.Operation{} + for _, bumpTo := range bumpTos { + ops = append(ops, xdr.Operation{ + Body: xdr.OperationBody{ + BumpSequenceOp: &xdr.BumpSequenceOp{ + BumpTo: xdr.SequenceNumber(bumpTo), + }, + }, + }) + } + + tx := archive.LedgerTransaction{ + Envelope: xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + SourceAccount: source.ToMuxedAccount(), + Fee: xdr.Uint32(1), + Operations: ops, + }, + }, + }, + Index: txIndex, + } + + return tx +}