Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(eventindexer): slow md indexing #17816

Merged
merged 14 commits into from
Jul 19, 2024
860 changes: 860 additions & 0 deletions packages/eventindexer/AssignmentHook.json

Large diffs are not rendered by default.

1,637 changes: 1,637 additions & 0 deletions packages/eventindexer/Bridge.json

Large diffs are not rendered by default.

79 changes: 44 additions & 35 deletions packages/eventindexer/indexer/fetch_nft_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package indexer

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"log/slog"
"math/big"
"net/http"
"net/url"
"strings"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
Expand Down Expand Up @@ -54,37 +54,33 @@ func (i *Indexer) fetchNFTMetadata(
return nil, errors.Wrap(err, "contractABI.UnpackIntoInterface")
}

url, err := resolveMetadataURL(ctx, tokenURI)
if err != nil {
if errors.Is(err, eventindexer.ErrInvalidURL) {
slog.Warn("Invalid metadata URI",
"contractAddress", contractAddress,
"tokenID", tokenID.Int64(),
"chainID", chainID.String())

return nil, nil
}
mdURL := resolveMetadataURL(tokenURI)

return nil, errors.Wrap(err, "resolveMetadataURL")
if !isValidURL(mdURL) {
return nil, nil
}

var metadata *eventindexer.NFTMetadata

//nolint
resp, err := http.Get(url)
resp, err := http.Get(mdURL)
if err != nil {
return nil, err
}

defer resp.Body.Close()

var metadata eventindexer.NFTMetadata
if resp.StatusCode != http.StatusOK {
return nil, nil
}

err = json.NewDecoder(resp.Body).Decode(&metadata)
if err != nil {
return nil, err
}

if methodName == "tokenURI" {
if err := i.fetchSymbol(ctx, contractABI, &metadata, contractAddressCommon); err != nil {
if err := i.fetchSymbol(ctx, contractABI, metadata, contractAddressCommon); err != nil {
return nil, err
}
}
Expand All @@ -93,39 +89,52 @@ func (i *Indexer) fetchNFTMetadata(
metadata.TokenID = tokenID.Int64()
metadata.ChainID = chainID.Int64()

return &metadata, nil
return metadata, nil
}

func resolveMetadataURL(ctx context.Context, tokenURI string) (string, error) {
if strings.HasPrefix(tokenURI, "ipfs://") {
ipfsHash := strings.TrimPrefix(tokenURI, "ipfs://")
resolvedURL := fmt.Sprintf("https://ipfs.io/ipfs/%s", ipfsHash)
// isValidURL checks if the given string is a valid URL
func isValidURL(str string) bool {
u, err := url.Parse(str)
if err != nil || u.Scheme == "" || u.Host == "" {
return false
}

if isValidURL(ctx, resolvedURL) {
return resolvedURL, nil
}
return true
}

return "", eventindexer.ErrInvalidURL
// isBase64 checks if the given string is a valid base64 encoded string
func isBase64(str string) bool {
if !strings.Contains(str, "base64,") {
return false
}

if isValidURL(ctx, tokenURI) {
return tokenURI, nil
parts := strings.Split(str, "base64,")
if len(parts) != 2 {
return false
}

return "", eventindexer.ErrInvalidURL
_, err := base64.StdEncoding.DecodeString(parts[1])

return err == nil
}

func isValidURL(ctx context.Context, rawURL string) bool {
client := &http.Client{
Timeout: 3 * time.Second,
func resolveMetadataURL(tokenURI string) string {
if strings.HasPrefix(tokenURI, "ipfs://") {
ipfsHash := strings.TrimPrefix(tokenURI, "ipfs://")
resolvedURL := fmt.Sprintf("https://ipfs.io/ipfs/%s", ipfsHash)

return resolvedURL
}

resp, err := client.Head(rawURL)
if err != nil || resp.StatusCode != http.StatusOK {
return false
if isBase64(tokenURI) {
parts := strings.Split(tokenURI, "base64,")

decodedTokenURI, _ := base64.StdEncoding.DecodeString(parts[1])

return string(decodedTokenURI)
}

return true
return tokenURI
}

func (i *Indexer) fetchSymbol(ctx context.Context, contractABI abi.ABI, metadata *eventindexer.NFTMetadata, contractAddress common.Address) error {
Expand Down
4 changes: 0 additions & 4 deletions packages/eventindexer/indexer/index_erc20_transfers.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,6 @@ func (i *Indexer) saveERC20Transfer(ctx context.Context, chainID *big.Int, vLog
if err != nil {
return errors.Wrap(err, "i.erc20BalanceRepo.CreateMetadata")
}

slog.Info("metadata created", "pk", pk, "symbol", symbol, "decimals", decimals, "contractAddress", vLog.Address.Hex())
} else {
slog.Info("metadata found", "pk", pk, "symbol", md.Symbol, "decimals", md.Decimals, "contractAddress", vLog.Address.Hex())
}

// increment To address's balance
Expand Down
46 changes: 19 additions & 27 deletions packages/eventindexer/indexer/index_nft_transfers.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (i *Indexer) indexNFTTransfers(
}

// isERC1155Transfer determines whether a given log is a valid ERC1155 transfer event
func (i *Indexer) isERC1155Transfer(ctx context.Context, vLog types.Log) bool {
func (i *Indexer) isERC1155Transfer(_ context.Context, vLog types.Log) bool {
// malformed event
if len(vLog.Topics) == 0 {
return false
Expand Down Expand Up @@ -152,20 +152,23 @@ func (i *Indexer) saveERC721Transfer(ctx context.Context, chainID *big.Int, vLog
TokenID: tokenID,
Name: "invalid_metadata",
}
}

metadata, err = i.nftMetadataRepo.SaveNFTMetadata(ctx, metadata)
if err != nil {
return errors.Wrap(err, "i.nftMetadataRepo.SaveNFTMetadata")
metadata, err = i.nftMetadataRepo.SaveNFTMetadata(ctx, metadata)
if err != nil {
return errors.Wrap(err, "i.nftMetadataRepo.SaveNFTMetadata")
}
}

pk = metadata.ID
}

slog.Info("metadata created", "contractAddress", vLog.Address.Hex(), "tokenId", metadata.TokenID)
} else {
slog.Info("metadata found", "contractAddress", vLog.Address.Hex(), "tokenId", metadata.TokenID)
if pk == 0 {
slog.Warn("unable to create or fetch md", "contractAddress", vLog.Address.Hex())
return nil
}

slog.Info("metadata pk", "pk", pk)

// increment To address's balance
// decrement From address's balance
increaseOpts := eventindexer.UpdateNFTBalanceOpts{
Expand Down Expand Up @@ -257,18 +260,14 @@ func (i *Indexer) saveERC1155Transfer(ctx context.Context, chainID *big.Int, vLo
TokenID: t.Id.Int64(),
Name: "invalid_metadata",
}
}

metadata, err = i.nftMetadataRepo.SaveNFTMetadata(ctx, metadata)
if err != nil {
return errors.Wrap(err, "i.nftMetadataRepo.SaveNFTMetadata")
metadata, err = i.nftMetadataRepo.SaveNFTMetadata(ctx, metadata)
if err != nil {
return errors.Wrap(err, "i.nftMetadataRepo.SaveNFTMetadata")
}
}

pk = metadata.ID

slog.Info("metadata created", "contractAddress", vLog.Address.Hex(), "tokenId", metadata.TokenID)
} else {
slog.Info("metadata found", "contractAddress", vLog.Address.Hex(), "tokenId", metadata.TokenID)
}

increaseOpts := eventindexer.UpdateNFTBalanceOpts{
Expand Down Expand Up @@ -300,8 +299,6 @@ func (i *Indexer) saveERC1155Transfer(ctx context.Context, chainID *big.Int, vLo
return err
}
} else if vLog.Topics[0].Hex() == transferBatchSignatureHash.Hex() {
slog.Info("erc1155 transfer batch")

type TransferBatchEvent struct {
Operator common.Address
From common.Address
Expand All @@ -320,7 +317,6 @@ func (i *Indexer) saveERC1155Transfer(ctx context.Context, chainID *big.Int, vLo
for idx, id := range t.Ids {
var pk int = 0

slog.Info("ERC1155 BATCH:", "", pk)
// Check if metadata already exists in db, if not fetch and store
metadata, err := i.nftMetadataRepo.GetNFTMetadata(ctx, vLog.Address.Hex(), id.Int64(), chainID.Int64())
if err != nil {
Expand All @@ -344,18 +340,14 @@ func (i *Indexer) saveERC1155Transfer(ctx context.Context, chainID *big.Int, vLo
TokenID: id.Int64(),
Name: "invalid_metadata",
}
}

metadata, err = i.nftMetadataRepo.SaveNFTMetadata(ctx, metadata)
if err != nil {
return errors.Wrap(err, "i.nftMetadataRepo.SaveNFTMetadata")
metadata, err = i.nftMetadataRepo.SaveNFTMetadata(ctx, metadata)
if err != nil {
return errors.Wrap(err, "i.nftMetadataRepo.SaveNFTMetadata")
}
}

pk = metadata.ID

slog.Info("metadata created", "contractAddress", vLog.Address.Hex(), "tokenId", metadata.TokenID)
} else {
slog.Info("metadata found", "contractAddress", vLog.Address.Hex(), "tokenId", metadata.TokenID)
}

increaseOpts := eventindexer.UpdateNFTBalanceOpts{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE `nft_metadata` ADD INDEX `nft_metadata_contract_address_token_id_chain_id_index` (`contract_address`, `token_id`, `chain_id`),
ADD INDEX `nft_metadata_contract_address_index` (`contract_address`)
;

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
ALTER TABLE nft_metadata DROP INDEX nft_metadata_contract_address_token_id_chain_id_index,
DROP INDEX nft_metadata_contract_address;
-- +goose StatementEnd
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE `nft_balances` ADD INDEX `nft_balances_address_chain_id_amount_index` (`address`, `chain_id`, `amount`);

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
ALTER TABLE nft_balances
DROP INDEX nft_balances_address_chain_id_amount_index
-- +goose StatementEnd
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE `erc20_balances` ADD INDEX `erc20_balances_contract_address_address_chain_id_index` (`contract_address`, `address`, `chain_id`);

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
ALTER TABLE erc20_balances DROP INDEX erc20_balances_contract_address_address_chain_id_index,
DROP INDEX erc20_balances_contract_address-- +goose StatementEnd
-- +goose StatementEnd
6 changes: 5 additions & 1 deletion packages/eventindexer/pkg/repo/nft_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package repo

import (
"context"
"github.com/taikoxyz/taiko-mono/packages/eventindexer/pkg/db"
"net/http"
"strings"
"time"

"github.com/taikoxyz/taiko-mono/packages/eventindexer/pkg/db"
"golang.org/x/exp/slog"

"github.com/morkid/paginate"
"github.com/pkg/errors"
"github.com/taikoxyz/taiko-mono/packages/eventindexer"
Expand Down Expand Up @@ -137,6 +139,8 @@ func (r *NFTBalanceRepository) IncreaseAndDecreaseBalancesInTx(
}

if strings.Contains(err.Error(), "Deadlock") {
slog.Warn("database deadlock")

retries--

time.Sleep(100 * time.Millisecond) // backoff before retrying
Expand Down
Loading