Skip to content

Commit

Permalink
fix(eventindexer): slow md indexing (#17816)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberhorsey authored Jul 19, 2024
1 parent a513588 commit a82326e
Show file tree
Hide file tree
Showing 9 changed files with 2,597 additions and 67 deletions.
860 changes: 860 additions & 0 deletions packages/eventindexer/AssignmentHook.json

Large diffs are not rendered by default.

1,637 changes: 1,637 additions & 0 deletions packages/eventindexer/Bridge.json

Large diffs are not rendered by default.

79 changes: 44 additions & 35 deletions packages/eventindexer/indexer/fetch_nft_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package indexer

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"log/slog"
"math/big"
"net/http"
"net/url"
"strings"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
Expand Down Expand Up @@ -54,37 +54,33 @@ func (i *Indexer) fetchNFTMetadata(
return nil, errors.Wrap(err, "contractABI.UnpackIntoInterface")
}

url, err := resolveMetadataURL(ctx, tokenURI)
if err != nil {
if errors.Is(err, eventindexer.ErrInvalidURL) {
slog.Warn("Invalid metadata URI",
"contractAddress", contractAddress,
"tokenID", tokenID.Int64(),
"chainID", chainID.String())

return nil, nil
}
mdURL := resolveMetadataURL(tokenURI)

return nil, errors.Wrap(err, "resolveMetadataURL")
if !isValidURL(mdURL) {
return nil, nil
}

var metadata *eventindexer.NFTMetadata

//nolint
resp, err := http.Get(url)
resp, err := http.Get(mdURL)
if err != nil {
return nil, err
}

defer resp.Body.Close()

var metadata eventindexer.NFTMetadata
if resp.StatusCode != http.StatusOK {
return nil, nil
}

err = json.NewDecoder(resp.Body).Decode(&metadata)
if err != nil {
return nil, err
}

if methodName == "tokenURI" {
if err := i.fetchSymbol(ctx, contractABI, &metadata, contractAddressCommon); err != nil {
if err := i.fetchSymbol(ctx, contractABI, metadata, contractAddressCommon); err != nil {
return nil, err
}
}
Expand All @@ -93,39 +89,52 @@ func (i *Indexer) fetchNFTMetadata(
metadata.TokenID = tokenID.Int64()
metadata.ChainID = chainID.Int64()

return &metadata, nil
return metadata, nil
}

func resolveMetadataURL(ctx context.Context, tokenURI string) (string, error) {
if strings.HasPrefix(tokenURI, "ipfs://") {
ipfsHash := strings.TrimPrefix(tokenURI, "ipfs://")
resolvedURL := fmt.Sprintf("https://ipfs.io/ipfs/%s", ipfsHash)
// isValidURL checks if the given string is a valid URL
func isValidURL(str string) bool {
u, err := url.Parse(str)
if err != nil || u.Scheme == "" || u.Host == "" {
return false
}

if isValidURL(ctx, resolvedURL) {
return resolvedURL, nil
}
return true
}

return "", eventindexer.ErrInvalidURL
// isBase64 checks if the given string is a valid base64 encoded string
func isBase64(str string) bool {
if !strings.Contains(str, "base64,") {
return false
}

if isValidURL(ctx, tokenURI) {
return tokenURI, nil
parts := strings.Split(str, "base64,")
if len(parts) != 2 {
return false
}

return "", eventindexer.ErrInvalidURL
_, err := base64.StdEncoding.DecodeString(parts[1])

return err == nil
}

func isValidURL(ctx context.Context, rawURL string) bool {
client := &http.Client{
Timeout: 3 * time.Second,
func resolveMetadataURL(tokenURI string) string {
if strings.HasPrefix(tokenURI, "ipfs://") {
ipfsHash := strings.TrimPrefix(tokenURI, "ipfs://")
resolvedURL := fmt.Sprintf("https://ipfs.io/ipfs/%s", ipfsHash)

return resolvedURL
}

resp, err := client.Head(rawURL)
if err != nil || resp.StatusCode != http.StatusOK {
return false
if isBase64(tokenURI) {
parts := strings.Split(tokenURI, "base64,")

decodedTokenURI, _ := base64.StdEncoding.DecodeString(parts[1])

return string(decodedTokenURI)
}

return true
return tokenURI
}

func (i *Indexer) fetchSymbol(ctx context.Context, contractABI abi.ABI, metadata *eventindexer.NFTMetadata, contractAddress common.Address) error {
Expand Down
4 changes: 0 additions & 4 deletions packages/eventindexer/indexer/index_erc20_transfers.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,6 @@ func (i *Indexer) saveERC20Transfer(ctx context.Context, chainID *big.Int, vLog
if err != nil {
return errors.Wrap(err, "i.erc20BalanceRepo.CreateMetadata")
}

slog.Info("metadata created", "pk", pk, "symbol", symbol, "decimals", decimals, "contractAddress", vLog.Address.Hex())
} else {
slog.Info("metadata found", "pk", pk, "symbol", md.Symbol, "decimals", md.Decimals, "contractAddress", vLog.Address.Hex())
}

// increment To address's balance
Expand Down
46 changes: 19 additions & 27 deletions packages/eventindexer/indexer/index_nft_transfers.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (i *Indexer) indexNFTTransfers(
}

// isERC1155Transfer determines whether a given log is a valid ERC1155 transfer event
func (i *Indexer) isERC1155Transfer(ctx context.Context, vLog types.Log) bool {
func (i *Indexer) isERC1155Transfer(_ context.Context, vLog types.Log) bool {
// malformed event
if len(vLog.Topics) == 0 {
return false
Expand Down Expand Up @@ -152,20 +152,23 @@ func (i *Indexer) saveERC721Transfer(ctx context.Context, chainID *big.Int, vLog
TokenID: tokenID,
Name: "invalid_metadata",
}
}

metadata, err = i.nftMetadataRepo.SaveNFTMetadata(ctx, metadata)
if err != nil {
return errors.Wrap(err, "i.nftMetadataRepo.SaveNFTMetadata")
metadata, err = i.nftMetadataRepo.SaveNFTMetadata(ctx, metadata)
if err != nil {
return errors.Wrap(err, "i.nftMetadataRepo.SaveNFTMetadata")
}
}

pk = metadata.ID
}

slog.Info("metadata created", "contractAddress", vLog.Address.Hex(), "tokenId", metadata.TokenID)
} else {
slog.Info("metadata found", "contractAddress", vLog.Address.Hex(), "tokenId", metadata.TokenID)
if pk == 0 {
slog.Warn("unable to create or fetch md", "contractAddress", vLog.Address.Hex())
return nil
}

slog.Info("metadata pk", "pk", pk)

// increment To address's balance
// decrement From address's balance
increaseOpts := eventindexer.UpdateNFTBalanceOpts{
Expand Down Expand Up @@ -257,18 +260,14 @@ func (i *Indexer) saveERC1155Transfer(ctx context.Context, chainID *big.Int, vLo
TokenID: t.Id.Int64(),
Name: "invalid_metadata",
}
}

metadata, err = i.nftMetadataRepo.SaveNFTMetadata(ctx, metadata)
if err != nil {
return errors.Wrap(err, "i.nftMetadataRepo.SaveNFTMetadata")
metadata, err = i.nftMetadataRepo.SaveNFTMetadata(ctx, metadata)
if err != nil {
return errors.Wrap(err, "i.nftMetadataRepo.SaveNFTMetadata")
}
}

pk = metadata.ID

slog.Info("metadata created", "contractAddress", vLog.Address.Hex(), "tokenId", metadata.TokenID)
} else {
slog.Info("metadata found", "contractAddress", vLog.Address.Hex(), "tokenId", metadata.TokenID)
}

increaseOpts := eventindexer.UpdateNFTBalanceOpts{
Expand Down Expand Up @@ -300,8 +299,6 @@ func (i *Indexer) saveERC1155Transfer(ctx context.Context, chainID *big.Int, vLo
return err
}
} else if vLog.Topics[0].Hex() == transferBatchSignatureHash.Hex() {
slog.Info("erc1155 transfer batch")

type TransferBatchEvent struct {
Operator common.Address
From common.Address
Expand All @@ -320,7 +317,6 @@ func (i *Indexer) saveERC1155Transfer(ctx context.Context, chainID *big.Int, vLo
for idx, id := range t.Ids {
var pk int = 0

slog.Info("ERC1155 BATCH:", "", pk)
// Check if metadata already exists in db, if not fetch and store
metadata, err := i.nftMetadataRepo.GetNFTMetadata(ctx, vLog.Address.Hex(), id.Int64(), chainID.Int64())
if err != nil {
Expand All @@ -344,18 +340,14 @@ func (i *Indexer) saveERC1155Transfer(ctx context.Context, chainID *big.Int, vLo
TokenID: id.Int64(),
Name: "invalid_metadata",
}
}

metadata, err = i.nftMetadataRepo.SaveNFTMetadata(ctx, metadata)
if err != nil {
return errors.Wrap(err, "i.nftMetadataRepo.SaveNFTMetadata")
metadata, err = i.nftMetadataRepo.SaveNFTMetadata(ctx, metadata)
if err != nil {
return errors.Wrap(err, "i.nftMetadataRepo.SaveNFTMetadata")
}
}

pk = metadata.ID

slog.Info("metadata created", "contractAddress", vLog.Address.Hex(), "tokenId", metadata.TokenID)
} else {
slog.Info("metadata found", "contractAddress", vLog.Address.Hex(), "tokenId", metadata.TokenID)
}

increaseOpts := eventindexer.UpdateNFTBalanceOpts{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE `nft_metadata` ADD INDEX `nft_metadata_contract_address_token_id_chain_id_index` (`contract_address`, `token_id`, `chain_id`),
ADD INDEX `nft_metadata_contract_address_index` (`contract_address`)
;

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
ALTER TABLE nft_metadata DROP INDEX nft_metadata_contract_address_token_id_chain_id_index,
DROP INDEX nft_metadata_contract_address;
-- +goose StatementEnd
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE `nft_balances` ADD INDEX `nft_balances_address_chain_id_amount_index` (`address`, `chain_id`, `amount`);

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
ALTER TABLE nft_balances
DROP INDEX nft_balances_address_chain_id_amount_index
-- +goose StatementEnd
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE `erc20_balances` ADD INDEX `erc20_balances_contract_address_address_chain_id_index` (`contract_address`, `address`, `chain_id`);

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
ALTER TABLE erc20_balances DROP INDEX erc20_balances_contract_address_address_chain_id_index,
DROP INDEX erc20_balances_contract_address-- +goose StatementEnd
-- +goose StatementEnd
6 changes: 5 additions & 1 deletion packages/eventindexer/pkg/repo/nft_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package repo

import (
"context"
"github.com/taikoxyz/taiko-mono/packages/eventindexer/pkg/db"
"net/http"
"strings"
"time"

"github.com/taikoxyz/taiko-mono/packages/eventindexer/pkg/db"
"golang.org/x/exp/slog"

"github.com/morkid/paginate"
"github.com/pkg/errors"
"github.com/taikoxyz/taiko-mono/packages/eventindexer"
Expand Down Expand Up @@ -137,6 +139,8 @@ func (r *NFTBalanceRepository) IncreaseAndDecreaseBalancesInTx(
}

if strings.Contains(err.Error(), "Deadlock") {
slog.Warn("database deadlock")

retries--

time.Sleep(100 * time.Millisecond) // backoff before retrying
Expand Down

0 comments on commit a82326e

Please sign in to comment.