From d0e25ba6d1a871d4e6153fba663ccdfa63f16f05 Mon Sep 17 00:00:00 2001 From: K-KAD <54041472+k-kaddal@users.noreply.github.com> Date: Tue, 2 Jul 2024 11:52:45 +0100 Subject: [PATCH] feat(eventindexer): indexing nft metadata (#17538) Co-authored-by: Jeffery Walsh Co-authored-by: jeff <113397187+cyberhorsey@users.noreply.github.com> Co-authored-by: cyberhorsey --- packages/eventindexer/api/api.go | 6 + packages/eventindexer/contracts/erc721/abi.go | 39 ++++ packages/eventindexer/errors.go | 5 + .../indexer/fetch_nft_metadata.go | 171 ++++++++++++++++++ .../indexer/index_erc20_transfers.go | 45 ++++- .../indexer/index_nft_transfers.go | 156 +++++++++++++++- .../indexer/index_raw_block_data.go | 63 ++++--- packages/eventindexer/indexer/indexer.go | 12 ++ .../indexer/save_block_proposed_event.go | 39 ++-- .../indexer/save_block_verified_event.go | 23 ++- .../indexer/save_message_sent_event.go | 26 ++- .../indexer/save_transition_proved_event.go | 23 ++- .../set_initial_processing_block_height.go | 3 + ...270906208845_create_nft_metadata_table.sql | 23 +++ ...6208846_alter_nft_metadata_add_indexes.sql | 10 + ...ft_metadata_table_update_symbol_length.sql | 9 + ...alter_nft_balances_add_metadata_column.sql | 14 ++ packages/eventindexer/nft_balance.go | 17 +- packages/eventindexer/nft_metadata.go | 102 +++++++++++ ...et_nft_balances_by_address_and_chain_id.go | 26 +++ packages/eventindexer/pkg/http/server.go | 7 + packages/eventindexer/pkg/http/server_test.go | 32 +++- .../pkg/mock/nft_metadata_repository.go | 57 ++++++ .../eventindexer/pkg/repo/erc20_balance.go | 36 +++- packages/eventindexer/pkg/repo/nft_balance.go | 38 +++- .../eventindexer/pkg/repo/nft_balance_test.go | 18 ++ .../eventindexer/pkg/repo/nft_metadata.go | 91 ++++++++++ 27 files changed, 991 insertions(+), 100 deletions(-) create mode 100644 packages/eventindexer/contracts/erc721/abi.go create mode 100644 packages/eventindexer/indexer/fetch_nft_metadata.go create mode 100644 packages/eventindexer/migrations/20270906208845_create_nft_metadata_table.sql create mode 100644 packages/eventindexer/migrations/20270906208846_alter_nft_metadata_add_indexes.sql create mode 100644 packages/eventindexer/migrations/20270906208847_alter_nft_metadata_table_update_symbol_length.sql create mode 100644 packages/eventindexer/migrations/20270906208848_alter_nft_balances_add_metadata_column.sql create mode 100644 packages/eventindexer/nft_metadata.go create mode 100644 packages/eventindexer/pkg/mock/nft_metadata_repository.go create mode 100644 packages/eventindexer/pkg/repo/nft_metadata.go diff --git a/packages/eventindexer/api/api.go b/packages/eventindexer/api/api.go index 0f36b23160e..ff77cdce199 100644 --- a/packages/eventindexer/api/api.go +++ b/packages/eventindexer/api/api.go @@ -72,6 +72,11 @@ func InitFromConfig(ctx context.Context, api *API, cfg *Config) error { return err } + nftMetadataRepository, err := repo.NewNFTMetadataRepository(db) + if err != nil { + return err + } + ethClient, err := ethclient.Dial(cfg.RPCUrl) if err != nil { return err @@ -80,6 +85,7 @@ func InitFromConfig(ctx context.Context, api *API, cfg *Config) error { srv, err := http.NewServer(http.NewServerOpts{ EventRepo: eventRepository, NFTBalanceRepo: nftBalanceRepository, + NFTMetadataRepo: nftMetadataRepository, ERC20BalanceRepo: erc20BalanceRepository, ChartRepo: chartRepository, Echo: echo.New(), diff --git a/packages/eventindexer/contracts/erc721/abi.go b/packages/eventindexer/contracts/erc721/abi.go new file mode 100644 index 00000000000..02523857c28 --- /dev/null +++ b/packages/eventindexer/contracts/erc721/abi.go @@ -0,0 +1,39 @@ +package erc721 + +var ( + ABI = `[ + { + "constant":true, + "inputs":[ + { + "name":"_tokenId", + "type":"uint256" + } + ], + "name":"tokenURI", + "outputs":[ + { + "name":"", + "type":"string" + } + ], + "payable":false, + "stateMutability":"view", + "type":"function" + }, + { + "constant": true, + "inputs": [], + "name": "symbol", + "outputs": [ + { + "name": "", + "type": "string" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + } + ]` +) diff --git a/packages/eventindexer/errors.go b/packages/eventindexer/errors.go index 73e8b651300..8a2e833b55d 100644 --- a/packages/eventindexer/errors.go +++ b/packages/eventindexer/errors.go @@ -12,6 +12,10 @@ var ( "ERR_NO_NFT_BALANCE_REPOSITORY", "NFTBalanceRepository is required", ) + ErrNoNFTMetadataRepository = errors.Validation.NewWithKeyAndDetail( + "ERR_NO_NFT_METADATA_REPOSITORY", + "NFTMetadataRepository is required", + ) ErrNoStatRepository = errors.Validation.NewWithKeyAndDetail( "ERR_NO_STAT_REPOSITORY", "StatRepository is required", @@ -23,4 +27,5 @@ var ( ErrNoCORSOrigins = errors.Validation.NewWithKeyAndDetail("ERR_NO_CORS_ORIGINS", "CORS Origins are required") ErrNoRPCClient = errors.Validation.NewWithKeyAndDetail("ERR_NO_RPC_CLIENT", "RPCClient is required") ErrInvalidMode = errors.Validation.NewWithKeyAndDetail("ERR_INVALID_MODE", "Mode not supported") + ErrInvalidURL = errors.Validation.NewWithKeyAndDetail("ERR_INVALID_URL", "The provided URL is invalid or unreachable") ) diff --git a/packages/eventindexer/indexer/fetch_nft_metadata.go b/packages/eventindexer/indexer/fetch_nft_metadata.go new file mode 100644 index 00000000000..63dfa96d527 --- /dev/null +++ b/packages/eventindexer/indexer/fetch_nft_metadata.go @@ -0,0 +1,171 @@ +package indexer + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "math/big" + "net/http" + "strings" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + "github.com/taikoxyz/taiko-mono/packages/eventindexer" + "github.com/taikoxyz/taiko-mono/packages/eventindexer/contracts/erc1155" + "github.com/taikoxyz/taiko-mono/packages/eventindexer/contracts/erc721" +) + +func (i *Indexer) fetchNFTMetadata( + ctx context.Context, contractAddress string, + tokenID *big.Int, + abiJSON string, + methodName string, + chainID *big.Int) (*eventindexer.NFTMetadata, error) { + contractABI, err := abi.JSON(strings.NewReader(abiJSON)) + if err != nil { + return nil, err + } + + contractAddressCommon := common.HexToAddress(contractAddress) + + callData, err := contractABI.Pack(methodName, tokenID) + if err != nil { + return nil, err + } + + msg := ethereum.CallMsg{ + To: &contractAddressCommon, + Data: callData, + } + + result, err := i.ethClient.CallContract(ctx, msg, nil) + if err != nil { + return nil, errors.Wrap(err, "i.ethClient.CallContract") + } + + var tokenURI string + + err = contractABI.UnpackIntoInterface(&tokenURI, methodName, result) + if err != nil { + return nil, errors.Wrap(err, "contractABI.UnpackIntoInterface") + } + + url, err := resolveMetadataURL(ctx, tokenURI) + if err != nil { + if errors.Is(err, eventindexer.ErrInvalidURL) { + slog.Warn("Invalid metadata URI", + "contractAddress", contractAddress, + "tokenID", tokenID.Int64(), + "chainID", chainID.String()) + + return nil, nil + } + + return nil, errors.Wrap(err, "resolveMetadataURL") + } + + //nolint + resp, err := http.Get(url) + if err != nil { + return nil, err + } + + defer resp.Body.Close() + + var metadata eventindexer.NFTMetadata + + err = json.NewDecoder(resp.Body).Decode(&metadata) + if err != nil { + return nil, err + } + + if methodName == "tokenURI" { + if err := i.fetchSymbol(ctx, contractABI, &metadata, contractAddressCommon); err != nil { + return nil, err + } + } + + metadata.ContractAddress = contractAddress + metadata.TokenID = tokenID.Int64() + metadata.ChainID = chainID.Int64() + + return &metadata, nil +} + +func resolveMetadataURL(ctx context.Context, tokenURI string) (string, error) { + if strings.HasPrefix(tokenURI, "ipfs://") { + ipfsHash := strings.TrimPrefix(tokenURI, "ipfs://") + resolvedURL := fmt.Sprintf("https://ipfs.io/ipfs/%s", ipfsHash) + + if isValidURL(ctx, resolvedURL) { + return resolvedURL, nil + } + + return "", eventindexer.ErrInvalidURL + } + + if isValidURL(ctx, tokenURI) { + return tokenURI, nil + } + + return "", eventindexer.ErrInvalidURL +} + +func isValidURL(ctx context.Context, rawURL string) bool { + client := &http.Client{ + Timeout: 3 * time.Second, + } + + resp, err := client.Head(rawURL) + if err != nil || resp.StatusCode != http.StatusOK { + return false + } + + return true +} + +func (i *Indexer) fetchSymbol(ctx context.Context, contractABI abi.ABI, metadata *eventindexer.NFTMetadata, contractAddress common.Address) error { + symbolCallData, err := contractABI.Pack("symbol") + if err != nil { + return errors.Wrap(err, "contractABI.Pack") + } + + symbolMsg := ethereum.CallMsg{ + To: &contractAddress, + Data: symbolCallData, + } + + symbolResult, err := i.ethClient.CallContract(ctx, symbolMsg, nil) + if err != nil { + return errors.Wrap(err, "i.ethClient.CallContract(symbolMsg)") + } + + var symbol string + + err = contractABI.UnpackIntoInterface(&symbol, "symbol", symbolResult) + if err != nil { + return errors.Wrap(err, "contractABI.UnpackIntoInterface") + } + + metadata.Symbol = symbol + + return nil +} + +func (i *Indexer) fetchERC721Metadata(ctx context.Context, + contractAddress string, + tokenID *big.Int, + chainID *big.Int) (*eventindexer.NFTMetadata, error) { + return i.fetchNFTMetadata(ctx, contractAddress, tokenID, erc721.ABI, "tokenURI", chainID) +} + +func (i *Indexer) fetchERC1155Metadata(ctx context.Context, + contractAddress string, + tokenID *big.Int, + chainID *big.Int) (*eventindexer.NFTMetadata, error) { + return i.fetchNFTMetadata(ctx, contractAddress, tokenID, erc1155.ABI, "uri", chainID) +} diff --git a/packages/eventindexer/indexer/index_erc20_transfers.go b/packages/eventindexer/indexer/index_erc20_transfers.go index 1c50f5a5166..d958bdfc7cc 100644 --- a/packages/eventindexer/indexer/index_erc20_transfers.go +++ b/packages/eventindexer/indexer/index_erc20_transfers.go @@ -15,6 +15,7 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "github.com/pkg/errors" "github.com/taikoxyz/taiko-mono/packages/eventindexer" + "golang.org/x/sync/errgroup" ) // nolint: lll @@ -30,14 +31,26 @@ func (i *Indexer) indexERC20Transfers( chainID *big.Int, logs []types.Log, ) error { + wg, ctx := errgroup.WithContext(ctx) + for _, vLog := range logs { - if !i.isERC20Transfer(ctx, vLog) { - continue - } + l := vLog - if err := i.saveERC20Transfer(ctx, chainID, vLog); err != nil { - return err - } + wg.Go(func() error { + if !i.isERC20Transfer(ctx, l) { + return nil + } + + if err := i.saveERC20Transfer(ctx, chainID, l); err != nil { + return err + } + + return nil + }) + } + + if err := wg.Wait(); err != nil { + return err } return nil @@ -100,13 +113,27 @@ func (i *Indexer) saveERC20Transfer(ctx context.Context, chainID *big.Int, vLog var pk int = 0 - md, err := i.erc20BalanceRepo.FindMetadata(ctx, chainID.Int64(), vLog.Address.Hex()) - if err != nil { - return errors.Wrap(err, "i.erc20BalanceRepo") + i.contractToMetadataMutex.Lock() + + md, ok := i.contractToMetadata[vLog.Address] + + i.contractToMetadataMutex.Unlock() + + if !ok { + md, err = i.erc20BalanceRepo.FindMetadata(ctx, chainID.Int64(), vLog.Address.Hex()) + if err != nil { + return errors.Wrap(err, "i.erc20BalanceRepo") + } } if md != nil { pk = md.ID + + i.contractToMetadataMutex.Lock() + + i.contractToMetadata[vLog.Address] = md + + i.contractToMetadataMutex.Unlock() } if pk == 0 { diff --git a/packages/eventindexer/indexer/index_nft_transfers.go b/packages/eventindexer/indexer/index_nft_transfers.go index f74295f7d7d..72d1d1de9c6 100644 --- a/packages/eventindexer/indexer/index_nft_transfers.go +++ b/packages/eventindexer/indexer/index_nft_transfers.go @@ -15,6 +15,7 @@ import ( "github.com/pkg/errors" "github.com/taikoxyz/taiko-mono/packages/eventindexer" "github.com/taikoxyz/taiko-mono/packages/eventindexer/contracts/erc1155" + "golang.org/x/sync/errgroup" ) var ( @@ -33,14 +34,26 @@ func (i *Indexer) indexNFTTransfers( chainID *big.Int, logs []types.Log, ) error { + nftWg, ctx := errgroup.WithContext(ctx) + for _, vLog := range logs { - if !i.isERC721Transfer(ctx, vLog) && !i.isERC1155Transfer(ctx, vLog) { - continue - } + l := vLog - if err := i.saveNFTTransfer(ctx, chainID, vLog); err != nil { - return err - } + nftWg.Go(func() error { + if !i.isERC721Transfer(ctx, l) && !i.isERC1155Transfer(ctx, l) { + return nil + } + + if err := i.saveNFTTransfer(ctx, chainID, l); err != nil { + return errors.Wrap(err, "i.saveNFTTransfer") + } + + return nil + }) + } + + if err := nftWg.Wait(); err != nil { + return err } return nil @@ -102,9 +115,7 @@ func (i *Indexer) saveNFTTransfer(ctx context.Context, chainID *big.Int, vLog ty // saveERC721Transfer updates the user's balances on the from and to of a ERC721 transfer event func (i *Indexer) saveERC721Transfer(ctx context.Context, chainID *big.Int, vLog types.Log) error { from := fmt.Sprintf("0x%v", common.Bytes2Hex(vLog.Topics[1].Bytes()[12:])) - to := fmt.Sprintf("0x%v", common.Bytes2Hex(vLog.Topics[2].Bytes()[12:])) - tokenID := vLog.Topics[3].Big().Int64() slog.Info( @@ -115,10 +126,51 @@ func (i *Indexer) saveERC721Transfer(ctx context.Context, chainID *big.Int, vLog "contractAddress", vLog.Address.Hex(), ) + var pk int = 0 + + // Check if metadata already exists in db, if not fetch and store + metadata, err := i.nftMetadataRepo.GetNFTMetadata(ctx, vLog.Address.Hex(), tokenID, chainID.Int64()) + if err != nil { + return err + } + + if metadata != nil { + pk = metadata.ID + } + + if pk == 0 { + metadata, err = i.fetchERC721Metadata(ctx, vLog.Address.Hex(), vLog.Topics[3].Big(), chainID) + if err != nil { + slog.Warn("error fetching metadata. setting defaults", + "contractAddress", vLog.Address.Hex(), "error", err.Error()) + } + + if metadata == nil { + metadata = &eventindexer.NFTMetadata{ + ChainID: chainID.Int64(), + ContractAddress: vLog.Address.Hex(), + TokenID: tokenID, + Name: "invalid_metadata", + } + } + + metadata, err = i.nftMetadataRepo.SaveNFTMetadata(ctx, metadata) + if err != nil { + return errors.Wrap(err, "i.nftMetadataRepo.SaveNFTMetadata") + } + + pk = metadata.ID + + slog.Info("metadata created", "contractAddress", vLog.Address.Hex(), "tokenId", metadata.TokenID) + } else { + slog.Info("metadata found", "contractAddress", vLog.Address.Hex(), "tokenId", metadata.TokenID) + } + // increment To address's balance // decrement From address's balance increaseOpts := eventindexer.UpdateNFTBalanceOpts{ ChainID: chainID.Int64(), + NftMetadataId: int64(pk), Address: to, TokenID: tokenID, ContractAddress: vLog.Address.Hex(), @@ -131,6 +183,7 @@ func (i *Indexer) saveERC721Transfer(ctx context.Context, chainID *big.Int, vLog if from != ZeroAddress.Hex() { decreaseOpts = eventindexer.UpdateNFTBalanceOpts{ ChainID: chainID.Int64(), + NftMetadataId: int64(pk), Address: from, TokenID: tokenID, ContractAddress: vLog.Address.Hex(), @@ -139,7 +192,7 @@ func (i *Indexer) saveERC721Transfer(ctx context.Context, chainID *big.Int, vLog } } - _, _, err := i.nftBalanceRepo.IncreaseAndDecreaseBalancesInTx(ctx, increaseOpts, decreaseOpts) + _, _, err = i.nftBalanceRepo.IncreaseAndDecreaseBalancesInTx(ctx, increaseOpts, decreaseOpts) if err != nil { return err } @@ -151,7 +204,6 @@ func (i *Indexer) saveERC721Transfer(ctx context.Context, chainID *big.Int, vLog // the database and updates the user's balances func (i *Indexer) saveERC1155Transfer(ctx context.Context, chainID *big.Int, vLog types.Log) error { from := fmt.Sprintf("0x%v", common.Bytes2Hex(vLog.Topics[2].Bytes()[12:])) - to := fmt.Sprintf("0x%v", common.Bytes2Hex(vLog.Topics[3].Bytes()[12:])) slog.Info("erc1155 found") @@ -179,8 +231,49 @@ func (i *Indexer) saveERC1155Transfer(ctx context.Context, chainID *big.Int, vLo return err } + var pk int = 0 + + // Check if metadata already exists in db, if not fetch and store + metadata, err := i.nftMetadataRepo.GetNFTMetadata(ctx, vLog.Address.Hex(), t.Id.Int64(), chainID.Int64()) + if err != nil { + return err + } + + if metadata != nil { + pk = metadata.ID + } + + if pk == 0 { + metadata, err = i.fetchERC1155Metadata(ctx, vLog.Address.Hex(), t.Id, chainID) + if err != nil { + slog.Warn("error fetching metadata. setting defaults", + "contractAddress", vLog.Address.Hex(), "error", err.Error()) + } + + if metadata == nil { + metadata = &eventindexer.NFTMetadata{ + ChainID: chainID.Int64(), + ContractAddress: vLog.Address.Hex(), + TokenID: t.Id.Int64(), + Name: "invalid_metadata", + } + } + + metadata, err = i.nftMetadataRepo.SaveNFTMetadata(ctx, metadata) + if err != nil { + return errors.Wrap(err, "i.nftMetadataRepo.SaveNFTMetadata") + } + + pk = metadata.ID + + slog.Info("metadata created", "contractAddress", vLog.Address.Hex(), "tokenId", metadata.TokenID) + } else { + slog.Info("metadata found", "contractAddress", vLog.Address.Hex(), "tokenId", metadata.TokenID) + } + increaseOpts := eventindexer.UpdateNFTBalanceOpts{ ChainID: chainID.Int64(), + NftMetadataId: int64(pk), Address: to, TokenID: t.Id.Int64(), ContractAddress: vLog.Address.Hex(), @@ -193,6 +286,7 @@ func (i *Indexer) saveERC1155Transfer(ctx context.Context, chainID *big.Int, vLo // decrement From address's balance decreaseOpts = eventindexer.UpdateNFTBalanceOpts{ ChainID: chainID.Int64(), + NftMetadataId: int64(pk), Address: from, TokenID: t.Id.Int64(), ContractAddress: vLog.Address.Hex(), @@ -224,8 +318,49 @@ func (i *Indexer) saveERC1155Transfer(ctx context.Context, chainID *big.Int, vLo } for idx, id := range t.Ids { + var pk int = 0 + + slog.Info("ERC1155 BATCH:", "", pk) + // Check if metadata already exists in db, if not fetch and store + metadata, err := i.nftMetadataRepo.GetNFTMetadata(ctx, vLog.Address.Hex(), id.Int64(), chainID.Int64()) + if err != nil { + return err + } + + if metadata != nil { + pk = metadata.ID + } + + if pk == 0 { + metadata, err = i.fetchERC1155Metadata(ctx, vLog.Address.Hex(), id, chainID) + if err != nil { + return err + } + + if metadata == nil { + metadata = &eventindexer.NFTMetadata{ + ChainID: chainID.Int64(), + ContractAddress: vLog.Address.Hex(), + TokenID: id.Int64(), + Name: "invalid_metadata", + } + } + + metadata, err = i.nftMetadataRepo.SaveNFTMetadata(ctx, metadata) + if err != nil { + return errors.Wrap(err, "i.nftMetadataRepo.SaveNFTMetadata") + } + + pk = metadata.ID + + slog.Info("metadata created", "contractAddress", vLog.Address.Hex(), "tokenId", metadata.TokenID) + } else { + slog.Info("metadata found", "contractAddress", vLog.Address.Hex(), "tokenId", metadata.TokenID) + } + increaseOpts := eventindexer.UpdateNFTBalanceOpts{ ChainID: chainID.Int64(), + NftMetadataId: int64(pk), Address: to, TokenID: id.Int64(), ContractAddress: vLog.Address.Hex(), @@ -238,6 +373,7 @@ func (i *Indexer) saveERC1155Transfer(ctx context.Context, chainID *big.Int, vLo // decrement From address's balance decreaseOpts = eventindexer.UpdateNFTBalanceOpts{ ChainID: chainID.Int64(), + NftMetadataId: int64(pk), Address: from, TokenID: id.Int64(), ContractAddress: vLog.Address.Hex(), diff --git a/packages/eventindexer/indexer/index_raw_block_data.go b/packages/eventindexer/indexer/index_raw_block_data.go index f79edf6dcbd..f37c0defbf7 100644 --- a/packages/eventindexer/indexer/index_raw_block_data.go +++ b/packages/eventindexer/indexer/index_raw_block_data.go @@ -38,32 +38,45 @@ func (i *Indexer) indexRawBlockData( txs := block.Transactions() + txWg, ctx := errgroup.WithContext(ctx) + for _, tx := range txs { - slog.Info("transaction found", "hash", tx.Hash()) - receipt, err := i.ethClient.TransactionReceipt(ctx, tx.Hash()) - - if err != nil { - return err - } - - sender, err := i.ethClient.TransactionSender(ctx, tx, block.Hash(), receipt.TransactionIndex) - if err != nil { - return err - } - - if err := i.accountRepo.Save(ctx, sender, time.Unix(int64(block.Time()), 0)); err != nil { - return err - } - - if err := i.txRepo.Save(ctx, - tx, - sender, - block.Number(), - time.Unix(int64(block.Time()), 0), - receipt.ContractAddress, - ); err != nil { - return err - } + t := tx + + txWg.Go(func() error { + slog.Info("transaction found", "hash", t.Hash()) + + receipt, err := i.ethClient.TransactionReceipt(ctx, t.Hash()) + + if err != nil { + return errors.Wrap(err, "i.ethClient.TransactionReceipt") + } + + sender, err := i.ethClient.TransactionSender(ctx, t, block.Hash(), receipt.TransactionIndex) + if err != nil { + return errors.Wrap(err, "i.ethClient.TransactionSender") + } + + if err := i.accountRepo.Save(ctx, sender, time.Unix(int64(block.Time()), 0)); err != nil { + return errors.Wrap(err, "i.accountRepo.Save") + } + + if err := i.txRepo.Save(ctx, + t, + sender, + block.Number(), + time.Unix(int64(block.Time()), 0), + receipt.ContractAddress, + ); err != nil { + return errors.Wrap(err, "i.txRepo.Save") + } + + return nil + }) + } + + if err := txWg.Wait(); err != nil { + return err } return nil diff --git a/packages/eventindexer/indexer/indexer.go b/packages/eventindexer/indexer/indexer.go index 48f601caace..cec7189d1be 100644 --- a/packages/eventindexer/indexer/indexer.go +++ b/packages/eventindexer/indexer/indexer.go @@ -37,6 +37,7 @@ type Indexer struct { accountRepo eventindexer.AccountRepository eventRepo eventindexer.EventRepository nftBalanceRepo eventindexer.NFTBalanceRepository + nftMetadataRepo eventindexer.NFTMetadataRepository erc20BalanceRepo eventindexer.ERC20BalanceRepository txRepo eventindexer.TransactionRepository @@ -61,6 +62,9 @@ type Indexer struct { syncMode SyncMode blockSaveMutex *sync.Mutex + + contractToMetadata map[common.Address]*eventindexer.ERC20Metadata + contractToMetadataMutex *sync.Mutex } func (i *Indexer) Start() error { @@ -137,6 +141,11 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) error { return err } + nftMetadataRepository, err := repo.NewNFTMetadataRepository(db) + if err != nil { + return err + } + txRepository, err := repo.NewTransactionRepository(db) if err != nil { return err @@ -179,6 +188,7 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) error { i.eventRepo = eventRepository i.nftBalanceRepo = nftBalanceRepository i.erc20BalanceRepo = erc20BalanceRepository + i.nftMetadataRepo = nftMetadataRepository i.txRepo = txRepository i.srcChainID = chainID.Uint64() @@ -194,6 +204,8 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) error { i.indexNfts = cfg.IndexNFTs i.indexERC20s = cfg.IndexERC20s i.layer = cfg.Layer + i.contractToMetadata = make(map[common.Address]*eventindexer.ERC20Metadata, 0) + i.contractToMetadataMutex = &sync.Mutex{} return nil } diff --git a/packages/eventindexer/indexer/save_block_proposed_event.go b/packages/eventindexer/indexer/save_block_proposed_event.go index 030a01ddc0d..e8dc2669847 100644 --- a/packages/eventindexer/indexer/save_block_proposed_event.go +++ b/packages/eventindexer/indexer/save_block_proposed_event.go @@ -12,6 +12,7 @@ import ( "github.com/pkg/errors" "github.com/taikoxyz/taiko-mono/packages/eventindexer" "github.com/taikoxyz/taiko-mono/packages/eventindexer/contracts/taikol1" + "golang.org/x/sync/errgroup" ) func (i *Indexer) saveBlockProposedEvents( @@ -24,29 +25,41 @@ func (i *Indexer) saveBlockProposedEvents( return nil } + wg, ctx := errgroup.WithContext(ctx) + for { event := events.Event - tx, _, err := i.ethClient.TransactionByHash(ctx, event.Raw.TxHash) - if err != nil { - return errors.Wrap(err, "i.ethClient.TransactionByHash") - } + wg.Go(func() error { + tx, _, err := i.ethClient.TransactionByHash(ctx, event.Raw.TxHash) + if err != nil { + return errors.Wrap(err, "i.ethClient.TransactionByHash") + } - sender, err := i.ethClient.TransactionSender(ctx, tx, event.Raw.BlockHash, event.Raw.TxIndex) - if err != nil { - return errors.Wrap(err, "i.ethClient.TransactionSender") - } + sender, err := i.ethClient.TransactionSender(ctx, tx, event.Raw.BlockHash, event.Raw.TxIndex) + if err != nil { + return errors.Wrap(err, "i.ethClient.TransactionSender") + } - if err := i.saveBlockProposedEvent(ctx, chainID, event, sender); err != nil { - eventindexer.BlockProposedEventsProcessedError.Inc() + if err := i.saveBlockProposedEvent(ctx, chainID, event, sender); err != nil { + eventindexer.BlockProposedEventsProcessedError.Inc() - return errors.Wrap(err, "i.saveBlockProposedEvent") - } + return errors.Wrap(err, "i.saveBlockProposedEvent") + } - if !events.Next() { return nil + }) + + if !events.Next() { + break } } + + if err := wg.Wait(); err != nil { + return err + } + + return nil } func (i *Indexer) saveBlockProposedEvent( diff --git a/packages/eventindexer/indexer/save_block_verified_event.go b/packages/eventindexer/indexer/save_block_verified_event.go index 835794f4848..3f9386cf369 100644 --- a/packages/eventindexer/indexer/save_block_verified_event.go +++ b/packages/eventindexer/indexer/save_block_verified_event.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/taikoxyz/taiko-mono/packages/eventindexer" "github.com/taikoxyz/taiko-mono/packages/eventindexer/contracts/taikol1" + "golang.org/x/sync/errgroup" ) func (i *Indexer) saveBlockVerifiedEvents( @@ -23,19 +24,31 @@ func (i *Indexer) saveBlockVerifiedEvents( return nil } + wg, ctx := errgroup.WithContext(ctx) + for { event := events.Event - if err := i.saveBlockVerifiedEvent(ctx, chainID, event); err != nil { - eventindexer.BlockVerifiedEventsProcessedError.Inc() + wg.Go(func() error { + if err := i.saveBlockVerifiedEvent(ctx, chainID, event); err != nil { + eventindexer.BlockVerifiedEventsProcessedError.Inc() - return errors.Wrap(err, "i.saveBlockVerifiedEvent") - } + return errors.Wrap(err, "i.saveBlockVerifiedEvent") + } - if !events.Next() { return nil + }) + + if !events.Next() { + break } } + + if err := wg.Wait(); err != nil { + return err + } + + return nil } func (i *Indexer) saveBlockVerifiedEvent( diff --git a/packages/eventindexer/indexer/save_message_sent_event.go b/packages/eventindexer/indexer/save_message_sent_event.go index 84fc6f6260b..01d076f99f5 100644 --- a/packages/eventindexer/indexer/save_message_sent_event.go +++ b/packages/eventindexer/indexer/save_message_sent_event.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/taikoxyz/taiko-mono/packages/eventindexer" "github.com/taikoxyz/taiko-mono/packages/eventindexer/contracts/bridge" + "golang.org/x/sync/errgroup" ) func (i *Indexer) saveMessageSentEvents( @@ -20,24 +21,37 @@ func (i *Indexer) saveMessageSentEvents( ) error { if !events.Next() || events.Event == nil { slog.Info("no MessageSent events") + return nil } + wg, ctx := errgroup.WithContext(ctx) + for { event := events.Event - slog.Info("new messageSent event", "owner", event.Message.From.Hex()) + wg.Go(func() error { + slog.Info("new messageSent event", "owner", event.Message.From.Hex()) - if err := i.saveMessageSentEvent(ctx, chainID, event); err != nil { - eventindexer.MessageSentEventsProcessedError.Inc() + if err := i.saveMessageSentEvent(ctx, chainID, event); err != nil { + eventindexer.MessageSentEventsProcessedError.Inc() - return errors.Wrap(err, "i.saveMessageSentEvent") - } + return errors.Wrap(err, "i.saveMessageSentEvent") + } - if !events.Next() { return nil + }) + + if !events.Next() { + break } } + + if err := wg.Wait(); err != nil { + return err + } + + return nil } func (i *Indexer) saveMessageSentEvent( diff --git a/packages/eventindexer/indexer/save_transition_proved_event.go b/packages/eventindexer/indexer/save_transition_proved_event.go index 212e67046d3..00efd2559d7 100644 --- a/packages/eventindexer/indexer/save_transition_proved_event.go +++ b/packages/eventindexer/indexer/save_transition_proved_event.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/taikoxyz/taiko-mono/packages/eventindexer" "github.com/taikoxyz/taiko-mono/packages/eventindexer/contracts/taikol1" + "golang.org/x/sync/errgroup" ) func (i *Indexer) saveTransitionProvedEvents( @@ -23,19 +24,31 @@ func (i *Indexer) saveTransitionProvedEvents( return nil } + wg, ctx := errgroup.WithContext(ctx) + for { event := events.Event - if err := i.saveTransitionProvedEvent(ctx, chainID, event); err != nil { - eventindexer.TransitionProvedEventsProcessedError.Inc() + wg.Go(func() error { + if err := i.saveTransitionProvedEvent(ctx, chainID, event); err != nil { + eventindexer.TransitionProvedEventsProcessedError.Inc() - return errors.Wrap(err, "i.saveBlockProvenEvent") - } + return errors.Wrap(err, "i.saveBlockProvenEvent") + } - if !events.Next() { return nil + }) + + if !events.Next() { + break } } + + if err := wg.Wait(); err != nil { + return err + } + + return nil } func (i *Indexer) saveTransitionProvedEvent( diff --git a/packages/eventindexer/indexer/set_initial_processing_block_height.go b/packages/eventindexer/indexer/set_initial_processing_block_height.go index 619fac1933e..d1866d15dc6 100644 --- a/packages/eventindexer/indexer/set_initial_processing_block_height.go +++ b/packages/eventindexer/indexer/set_initial_processing_block_height.go @@ -2,6 +2,7 @@ package indexer import ( "context" + "log/slog" "github.com/pkg/errors" "github.com/taikoxyz/taiko-mono/packages/eventindexer" @@ -41,6 +42,8 @@ func (i *Indexer) setInitialIndexingBlockByMode( return eventindexer.ErrInvalidMode } + slog.Info("startingBlock", "startingBlock", startingBlock) + i.latestIndexedBlockNumber = startingBlock return nil diff --git a/packages/eventindexer/migrations/20270906208845_create_nft_metadata_table.sql b/packages/eventindexer/migrations/20270906208845_create_nft_metadata_table.sql new file mode 100644 index 00000000000..d99e396f797 --- /dev/null +++ b/packages/eventindexer/migrations/20270906208845_create_nft_metadata_table.sql @@ -0,0 +1,23 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TABLE IF NOT EXISTS nft_metadata ( + id int NOT NULL PRIMARY KEY AUTO_INCREMENT, + chain_id int NOT NULL, + contract_address VARCHAR(42) NOT NULL, + token_id DECIMAL(65, 0) NOT NULL, + name VARCHAR(255) DEFAULT NULL, + description TEXT DEFAULT NULL, + symbol VARCHAR(10) DEFAULT NULL, + attributes JSON DEFAULT NULL, + image_url TEXT DEFAULT NULL, + image_data TEXT DEFAULT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + UNIQUE (contract_address, token_id) +); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP TABLE nft_metadata; +-- +goose StatementEnd diff --git a/packages/eventindexer/migrations/20270906208846_alter_nft_metadata_add_indexes.sql b/packages/eventindexer/migrations/20270906208846_alter_nft_metadata_add_indexes.sql new file mode 100644 index 00000000000..6c95bbe7e57 --- /dev/null +++ b/packages/eventindexer/migrations/20270906208846_alter_nft_metadata_add_indexes.sql @@ -0,0 +1,10 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE `nft_metadata` ADD INDEX `nft_metadata_contract_address_token_id_index` (`contract_address`, `token_id`); + +-- +goose StatementEnd +-- +goose Down +-- +goose StatementBegin +ALTER TABLE nft_metadata + DROP INDEX nft_metadata_contract_address_token_id_index +-- +goose StatementEnd diff --git a/packages/eventindexer/migrations/20270906208847_alter_nft_metadata_table_update_symbol_length.sql b/packages/eventindexer/migrations/20270906208847_alter_nft_metadata_table_update_symbol_length.sql new file mode 100644 index 00000000000..412f64e63b9 --- /dev/null +++ b/packages/eventindexer/migrations/20270906208847_alter_nft_metadata_table_update_symbol_length.sql @@ -0,0 +1,9 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE nft_metadata MODIFY symbol VARCHAR(42) DEFAULT NULL; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +ALTER TABLE nft_metadata MODIFY symbol VARCHAR(10) DEFAULT NULL; +-- +goose StatementEnd \ No newline at end of file diff --git a/packages/eventindexer/migrations/20270906208848_alter_nft_balances_add_metadata_column.sql b/packages/eventindexer/migrations/20270906208848_alter_nft_balances_add_metadata_column.sql new file mode 100644 index 00000000000..43733bb47b7 --- /dev/null +++ b/packages/eventindexer/migrations/20270906208848_alter_nft_balances_add_metadata_column.sql @@ -0,0 +1,14 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE nft_balances + ADD COLUMN nft_metadata_id INT NULL DEFAULT 0, + ADD CONSTRAINT fk_nft_balances_nft_metadata + FOREIGN KEY (nft_metadata_id) REFERENCES nft_metadata(id); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +ALTER TABLE nft_balances + DROP FOREIGN KEY fk_nft_balances_nft_metadata, + DROP COLUMN nft_metadata_id; +-- +goose StatementEnd diff --git a/packages/eventindexer/nft_balance.go b/packages/eventindexer/nft_balance.go index 1497b32d6e3..f918ada9743 100644 --- a/packages/eventindexer/nft_balance.go +++ b/packages/eventindexer/nft_balance.go @@ -10,16 +10,19 @@ import ( // NFTBalance represents a single contractAddress/tokenId pairing for a given holder // address type NFTBalance struct { - ID int `json:"id"` - ChainID int64 `json:"chainID"` - Address string `json:"address"` - Amount int64 `json:"amount"` - TokenID int64 `json:"tokenID"` - ContractAddress string `json:"contractAddress"` - ContractType string `json:"contractType"` + ID int `json:"id"` + NftMetadataId int64 `json:"nftMetadataId"` + ChainID int64 `json:"chainID"` + Address string `json:"address"` + Amount int64 `json:"amount"` + TokenID int64 `json:"tokenID"` + ContractAddress string `json:"contractAddress"` + ContractType string `json:"contractType"` + Metadata *NFTMetadata `json:"metadata" gorm:"foreignKey:NftMetadataId"` } type UpdateNFTBalanceOpts struct { + NftMetadataId int64 ChainID int64 Address string TokenID int64 diff --git a/packages/eventindexer/nft_metadata.go b/packages/eventindexer/nft_metadata.go new file mode 100644 index 00000000000..a21dc86602a --- /dev/null +++ b/packages/eventindexer/nft_metadata.go @@ -0,0 +1,102 @@ +package eventindexer + +import ( + "context" + "database/sql/driver" + "encoding/json" + "errors" + "fmt" + "net/http" + "strconv" + + "github.com/morkid/paginate" +) + +type Attribute struct { + TraitType string `json:"trait_type"` + Value string `json:"value"` +} + +type Attributes []Attribute + +type NFTMetadata struct { + ID int `json:"id"` + ChainID int64 `json:"chain_id"` + ContractAddress string `json:"contract_address"` + TokenID int64 `json:"token_id"` + Name string `json:"name,omitempty"` + Description string `json:"description,omitempty"` + Symbol string `json:"symbol,omitempty"` + Attributes Attributes `json:"attributes,omitempty"` + ImageURL string `json:"image_url,omitempty"` + ImageData string `json:"image_data,omitempty"` +} + +type NFTMetadataRepository interface { + SaveNFTMetadata(ctx context.Context, metadata *NFTMetadata) (*NFTMetadata, error) + GetNFTMetadata(ctx context.Context, contractAddress string, tokenID int64, chainID int64) (*NFTMetadata, error) + FindByContractAddress(ctx context.Context, req *http.Request, contractAddress string) (paginate.Page, error) +} + +func (n *NFTMetadata) UnmarshalJSON(data []byte) error { + type Alias NFTMetadata + + aux := &struct { + ImageURL string `json:"image_url"` + Image string `json:"image"` + *Alias + }{ + Alias: (*Alias)(n), + } + + if err := json.Unmarshal(data, &aux); err != nil { + return fmt.Errorf("failed to unmarshal JSON: %v", err) + } + + if aux.ImageURL != "" { + n.ImageURL = aux.ImageURL + } else { + n.ImageURL = aux.Image + } + + return nil +} + +func (a Attributes) Value() (driver.Value, error) { + return json.Marshal(a) +} + +func (a *Attributes) Scan(value interface{}) error { + bytes, ok := value.([]byte) + if !ok { + return errors.New("type assertion to []byte failed") + } + + return json.Unmarshal(bytes, a) +} + +func (a *Attribute) UnmarshalJSON(data []byte) error { + type Alias Attribute + + aux := &struct { + Value interface{} `json:"value"` + *Alias + }{ + Alias: (*Alias)(a), + } + + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + + switch v := aux.Value.(type) { + case string: + a.Value = v + case float64: + a.Value = strconv.FormatFloat(v, 'f', -1, 64) + default: + return fmt.Errorf("unexpected type for value field: %T", aux.Value) + } + + return nil +} diff --git a/packages/eventindexer/pkg/http/get_nft_balances_by_address_and_chain_id.go b/packages/eventindexer/pkg/http/get_nft_balances_by_address_and_chain_id.go index 45a017bd24c..bef4f8ae00c 100644 --- a/packages/eventindexer/pkg/http/get_nft_balances_by_address_and_chain_id.go +++ b/packages/eventindexer/pkg/http/get_nft_balances_by_address_and_chain_id.go @@ -5,8 +5,18 @@ import ( "github.com/cyberhorsey/webutils" "github.com/labstack/echo/v4" + "github.com/taikoxyz/taiko-mono/packages/eventindexer" ) +type NFTBalanceWithMetadata struct { + Balance eventindexer.NFTBalance `json:"balance"` + Metadata *eventindexer.NFTMetadata `json:"metadata"` +} + +type NFTBalancesResponse struct { + Balances []NFTBalanceWithMetadata `json:"balances"` +} + // GetNFTBalancesByAddressAndChainID // // returns nft balances by address and chain ID @@ -30,5 +40,21 @@ func (srv *Server) GetNFTBalancesByAddressAndChainID(c echo.Context) error { return webutils.LogAndRenderErrors(c, http.StatusUnprocessableEntity, err) } + for i := range *page.Items.(*[]eventindexer.NFTBalance) { + v := &(*page.Items.(*[]eventindexer.NFTBalance))[i] + + metadata, err := srv.nftMetadataRepo.GetNFTMetadata( + c.Request().Context(), + v.ContractAddress, + v.TokenID, + v.ChainID, + ) + if err != nil { + return webutils.LogAndRenderErrors(c, http.StatusUnprocessableEntity, err) + } + + v.Metadata = metadata + } + return c.JSON(http.StatusOK, page) } diff --git a/packages/eventindexer/pkg/http/server.go b/packages/eventindexer/pkg/http/server.go index e774e898f45..d2d05fe1d21 100644 --- a/packages/eventindexer/pkg/http/server.go +++ b/packages/eventindexer/pkg/http/server.go @@ -31,6 +31,7 @@ type Server struct { echo *echo.Echo eventRepo eventindexer.EventRepository nftBalanceRepo eventindexer.NFTBalanceRepository + nftMetadataRepo eventindexer.NFTMetadataRepository erc20BalanceRepo eventindexer.ERC20BalanceRepository chartRepo eventindexer.ChartRepository cache *cache.Cache @@ -40,6 +41,7 @@ type NewServerOpts struct { Echo *echo.Echo EventRepo eventindexer.EventRepository NFTBalanceRepo eventindexer.NFTBalanceRepository + NFTMetadataRepo eventindexer.NFTMetadataRepository ERC20BalanceRepo eventindexer.ERC20BalanceRepository ChartRepo eventindexer.ChartRepository EthClient *ethclient.Client @@ -63,6 +65,10 @@ func (opts NewServerOpts) Validate() error { return eventindexer.ErrNoNFTBalanceRepository } + if opts.NFTMetadataRepo == nil { + return eventindexer.ErrNoNFTMetadataRepository + } + return nil } @@ -77,6 +83,7 @@ func NewServer(opts NewServerOpts) (*Server, error) { echo: opts.Echo, eventRepo: opts.EventRepo, nftBalanceRepo: opts.NFTBalanceRepo, + nftMetadataRepo: opts.NFTMetadataRepo, erc20BalanceRepo: opts.ERC20BalanceRepo, chartRepo: opts.ChartRepo, cache: cache, diff --git a/packages/eventindexer/pkg/http/server_test.go b/packages/eventindexer/pkg/http/server_test.go index 215615d1f6b..5eeff772750 100644 --- a/packages/eventindexer/pkg/http/server_test.go +++ b/packages/eventindexer/pkg/http/server_test.go @@ -24,6 +24,7 @@ func newTestServer(url string) *Server { echo: echo.New(), eventRepo: mock.NewEventRepository(), nftBalanceRepo: mock.NewNFTBalanceRepository(), + nftMetadataRepo: mock.NewNFTMetadataRepository(), erc20BalanceRepo: mock.NewERC20BalanceRepository(), } @@ -46,6 +47,7 @@ func Test_NewServer(t *testing.T) { EventRepo: &repo.EventRepository{}, CorsOrigins: make([]string, 0), NFTBalanceRepo: &repo.NFTBalanceRepository{}, + NFTMetadataRepo: &repo.NFTMetadataRepository{}, ERC20BalanceRepo: &repo.ERC20BalanceRepository{}, }, nil, @@ -60,20 +62,42 @@ func Test_NewServer(t *testing.T) { eventindexer.ErrNoNFTBalanceRepository, }, { - "noEventRepo", + "noNftMetadataRepo", NewServerOpts{ Echo: echo.New(), - CorsOrigins: make([]string, 0), + EventRepo: &repo.EventRepository{}, NFTBalanceRepo: &repo.NFTBalanceRepository{}, + CorsOrigins: make([]string, 0), }, - eventindexer.ErrNoEventRepository, + eventindexer.ErrNoNFTMetadataRepository, }, { - "noCorsOrigins", + "noEventRepo", NewServerOpts{ Echo: echo.New(), EventRepo: &repo.EventRepository{}, NFTBalanceRepo: &repo.NFTBalanceRepository{}, + CorsOrigins: make([]string, 0), + }, + eventindexer.ErrNoNFTMetadataRepository, + }, + { + "noEventRepo", + NewServerOpts{ + Echo: echo.New(), + CorsOrigins: make([]string, 0), + NFTBalanceRepo: &repo.NFTBalanceRepository{}, + NFTMetadataRepo: &repo.NFTMetadataRepository{}, + }, + eventindexer.ErrNoEventRepository, + }, + { + "noCorsOrigins", + NewServerOpts{ + Echo: echo.New(), + EventRepo: &repo.EventRepository{}, + NFTBalanceRepo: &repo.NFTBalanceRepository{}, + NFTMetadataRepo: &repo.NFTMetadataRepository{}, }, eventindexer.ErrNoCORSOrigins, }, diff --git a/packages/eventindexer/pkg/mock/nft_metadata_repository.go b/packages/eventindexer/pkg/mock/nft_metadata_repository.go new file mode 100644 index 00000000000..4db7a44e0b5 --- /dev/null +++ b/packages/eventindexer/pkg/mock/nft_metadata_repository.go @@ -0,0 +1,57 @@ +package mock + +import ( + "context" + "net/http" + + "github.com/morkid/paginate" + "github.com/taikoxyz/taiko-mono/packages/eventindexer" +) + +type NFTMetadataRepository struct { + nftMetadata []*eventindexer.NFTMetadata +} + +func NewNFTMetadataRepository() *NFTMetadataRepository { + return &NFTMetadataRepository{} +} + +func (r *NFTMetadataRepository) FindByContractAddress( + ctx context.Context, + req *http.Request, + contractAddress string) (paginate.Page, error) { + var metadata []*eventindexer.NFTMetadata + + for _, b := range r.nftMetadata { + if b.ContractAddress == contractAddress { + metadata = append(metadata, b) + } + } + + return paginate.Page{ + Items: metadata, + }, nil +} + +func (r *NFTMetadataRepository) GetNFTMetadata( + ctx context.Context, + contractAddress string, + tokenID int64, + chainID int64, +) (*eventindexer.NFTMetadata, error) { + for _, metadata := range r.nftMetadata { + if metadata.ContractAddress == contractAddress && metadata.TokenID == tokenID { + return metadata, nil + } + } + + return nil, nil +} + +func (r *NFTMetadataRepository) SaveNFTMetadata( + ctx context.Context, + metadata *eventindexer.NFTMetadata) (*eventindexer.NFTMetadata, error) { + r.nftMetadata = append(r.nftMetadata, metadata) + + return metadata, nil +} diff --git a/packages/eventindexer/pkg/repo/erc20_balance.go b/packages/eventindexer/pkg/repo/erc20_balance.go index 52c3bd3dc31..9bf52bfe330 100644 --- a/packages/eventindexer/pkg/repo/erc20_balance.go +++ b/packages/eventindexer/pkg/repo/erc20_balance.go @@ -4,6 +4,8 @@ import ( "context" "math/big" "net/http" + "strings" + "time" "github.com/morkid/paginate" "github.com/pkg/errors" @@ -118,22 +120,40 @@ func (r *ERC20BalanceRepository) IncreaseAndDecreaseBalancesInTx( increaseOpts eventindexer.UpdateERC20BalanceOpts, decreaseOpts eventindexer.UpdateERC20BalanceOpts, ) (increasedBalance *eventindexer.ERC20Balance, decreasedBalance *eventindexer.ERC20Balance, err error) { - err = r.db.GormDB().Transaction(func(tx *gorm.DB) (err error) { - increasedBalance, err = r.increaseBalanceInDB(ctx, tx, increaseOpts) - if err != nil { + retries := 10 + for retries > 0 { + err = r.db.GormDB().Transaction(func(tx *gorm.DB) (err error) { + increasedBalance, err = r.increaseBalanceInDB(ctx, tx, increaseOpts) + if err != nil { + return err + } + + if decreaseOpts.Amount != "0" && decreaseOpts.Amount != "" { + decreasedBalance, err = r.decreaseBalanceInDB(ctx, tx, decreaseOpts) + } + return err + }) + + if err == nil { + break } - if decreaseOpts.Amount != "0" && decreaseOpts.Amount != "" { - decreasedBalance, err = r.decreaseBalanceInDB(ctx, tx, decreaseOpts) + if strings.Contains(err.Error(), "Deadlock") { + retries-- + + time.Sleep(100 * time.Millisecond) // backoff before retrying + + continue } - return err - }) - if err != nil { return nil, nil, errors.Wrap(err, "r.db.Transaction") } + if err != nil { + return nil, nil, err + } + return increasedBalance, decreasedBalance, nil } diff --git a/packages/eventindexer/pkg/repo/nft_balance.go b/packages/eventindexer/pkg/repo/nft_balance.go index ccd6df691b8..d1bab45d5da 100644 --- a/packages/eventindexer/pkg/repo/nft_balance.go +++ b/packages/eventindexer/pkg/repo/nft_balance.go @@ -3,6 +3,8 @@ package repo import ( "context" "net/http" + "strings" + "time" "github.com/morkid/paginate" "github.com/pkg/errors" @@ -31,6 +33,7 @@ func (r *NFTBalanceRepository) increaseBalanceInDB( ) (*eventindexer.NFTBalance, error) { b := &eventindexer.NFTBalance{ ContractAddress: opts.ContractAddress, + NftMetadataId: opts.NftMetadataId, TokenID: opts.TokenID, Address: opts.Address, ContractType: opts.ContractType, @@ -68,6 +71,7 @@ func (r *NFTBalanceRepository) decreaseBalanceInDB( ) (*eventindexer.NFTBalance, error) { b := &eventindexer.NFTBalance{ ContractAddress: opts.ContractAddress, + NftMetadataId: opts.NftMetadataId, TokenID: opts.TokenID, Address: opts.Address, ContractType: opts.ContractType, @@ -112,22 +116,40 @@ func (r *NFTBalanceRepository) IncreaseAndDecreaseBalancesInTx( increaseOpts eventindexer.UpdateNFTBalanceOpts, decreaseOpts eventindexer.UpdateNFTBalanceOpts, ) (increasedBalance *eventindexer.NFTBalance, decreasedBalance *eventindexer.NFTBalance, err error) { - err = r.db.GormDB().Transaction(func(tx *gorm.DB) (err error) { - increasedBalance, err = r.increaseBalanceInDB(ctx, tx, increaseOpts) - if err != nil { + retries := 10 + for retries > 0 { + err = r.db.GormDB().Transaction(func(tx *gorm.DB) (err error) { + increasedBalance, err = r.increaseBalanceInDB(ctx, tx, increaseOpts) + if err != nil { + return err + } + + if decreaseOpts.Amount != 0 { + decreasedBalance, err = r.decreaseBalanceInDB(ctx, tx, decreaseOpts) + } + return err + }) + + if err == nil { + break } - if decreaseOpts.Amount != 0 { - decreasedBalance, err = r.decreaseBalanceInDB(ctx, tx, decreaseOpts) + if strings.Contains(err.Error(), "Deadlock") { + retries-- + + time.Sleep(100 * time.Millisecond) // backoff before retrying + + continue } - return err - }) - if err != nil { return nil, nil, errors.Wrap(err, "r.db.Transaction") } + if err != nil { + return nil, nil, err + } + return increasedBalance, decreasedBalance, nil } diff --git a/packages/eventindexer/pkg/repo/nft_balance_test.go b/packages/eventindexer/pkg/repo/nft_balance_test.go index 86d04be8602..650956a14f4 100644 --- a/packages/eventindexer/pkg/repo/nft_balance_test.go +++ b/packages/eventindexer/pkg/repo/nft_balance_test.go @@ -48,9 +48,22 @@ func TestIntegration_NFTBalance_Increase_And_Decrease(t *testing.T) { nftBalanceRepo, err := NewNFTBalanceRepository(db) assert.Equal(t, nil, err) + nftMetadataRepo, err := NewNFTMetadataRepository(db) + assert.Equal(t, nil, err) + + md, err := nftMetadataRepo.SaveNFTMetadata(context.Background(), &eventindexer.NFTMetadata{ + ChainID: 1, + ContractAddress: "0x123", + TokenID: 1, + Name: "test", + }) + assert.Equal(t, nil, err) + assert.NotNil(t, md) + bal1, _, err := nftBalanceRepo.IncreaseAndDecreaseBalancesInTx(context.Background(), eventindexer.UpdateNFTBalanceOpts{ ChainID: 1, + NftMetadataId: int64(md.ID), Address: "0x123", TokenID: 1, ContractAddress: "0x123", @@ -63,6 +76,7 @@ func TestIntegration_NFTBalance_Increase_And_Decrease(t *testing.T) { bal2, _, err := nftBalanceRepo.IncreaseAndDecreaseBalancesInTx(context.Background(), eventindexer.UpdateNFTBalanceOpts{ ChainID: 1, + NftMetadataId: int64(md.ID), Address: "0x123", TokenID: 1, ContractAddress: "0x123456", @@ -82,6 +96,7 @@ func TestIntegration_NFTBalance_Increase_And_Decrease(t *testing.T) { "success", eventindexer.UpdateNFTBalanceOpts{ ChainID: 1, + NftMetadataId: int64(md.ID), Address: "0x123", TokenID: 1, ContractAddress: "0x123456789", @@ -90,6 +105,7 @@ func TestIntegration_NFTBalance_Increase_And_Decrease(t *testing.T) { }, eventindexer.UpdateNFTBalanceOpts{ ChainID: 1, + NftMetadataId: int64(md.ID), Address: "0x123", TokenID: 1, ContractAddress: "0x123", @@ -102,6 +118,7 @@ func TestIntegration_NFTBalance_Increase_And_Decrease(t *testing.T) { "one left", eventindexer.UpdateNFTBalanceOpts{ ChainID: 1, + NftMetadataId: int64(md.ID), Address: "0x123", TokenID: 1, ContractAddress: "0x123456789", @@ -110,6 +127,7 @@ func TestIntegration_NFTBalance_Increase_And_Decrease(t *testing.T) { }, eventindexer.UpdateNFTBalanceOpts{ ChainID: 1, + NftMetadataId: int64(md.ID), Address: "0x123", TokenID: 1, ContractAddress: "0x123456", diff --git a/packages/eventindexer/pkg/repo/nft_metadata.go b/packages/eventindexer/pkg/repo/nft_metadata.go new file mode 100644 index 00000000000..384d4d80db3 --- /dev/null +++ b/packages/eventindexer/pkg/repo/nft_metadata.go @@ -0,0 +1,91 @@ +package repo + +import ( + "context" + "net/http" + + "github.com/morkid/paginate" + "github.com/pkg/errors" + "github.com/taikoxyz/taiko-mono/packages/eventindexer" + "gorm.io/gorm" +) + +type NFTMetadataRepository struct { + db eventindexer.DB +} + +func NewNFTMetadataRepository(db eventindexer.DB) (*NFTMetadataRepository, error) { + if db == nil { + return nil, eventindexer.ErrNoDB + } + + return &NFTMetadataRepository{ + db: db, + }, nil +} + +func (r *NFTMetadataRepository) SaveNFTMetadata( + ctx context.Context, + metadata *eventindexer.NFTMetadata, +) (*eventindexer.NFTMetadata, error) { + existingMetadata, err := r.GetNFTMetadata(ctx, metadata.ContractAddress, metadata.TokenID, metadata.ChainID) + if err != nil { + return nil, errors.Wrap(err, "failed to check existing metadata") + } + + if existingMetadata != nil { + return existingMetadata, nil + } + + err = r.db.GormDB().Save(metadata).Error + if err != nil { + return nil, errors.Wrap(err, "r.db.Save") + } + + return metadata, nil +} + +func (r *NFTMetadataRepository) GetNFTMetadata( + ctx context.Context, + contractAddress string, + tokenID int64, + chainID int64, +) (*eventindexer.NFTMetadata, error) { + metadata := &eventindexer.NFTMetadata{} + + err := r.db.GormDB(). + Where("contract_address = ?", contractAddress). + Where("token_id = ?", tokenID). + Where("chain_id = ?", chainID). + First(metadata). + Error + + if err != nil { + if err == gorm.ErrRecordNotFound { + return nil, nil + } + + return nil, errors.Wrap(err, "r.db.First") + } + + return metadata, nil +} + +func (r *NFTMetadataRepository) FindByContractAddress( + ctx context.Context, + req *http.Request, + contractAddress string, +) (paginate.Page, error) { + pg := paginate.New(&paginate.Config{ + DefaultSize: 100, + }) + + q := r.db.GormDB(). + Raw("SELECT * FROM nft_metadata WHERE contract_address = ?", contractAddress) + + reqCtx := pg.With(q) + + page := reqCtx.Request(req).Response(&[]eventindexer.NFTMetadata{}) + + return page, nil +}