diff --git a/services/crawler/cli.go b/services/crawler/cli.go index 2ee6afe6e..0cb0138df 100644 --- a/services/crawler/cli.go +++ b/services/crawler/cli.go @@ -76,16 +76,6 @@ func RootCmd(c *cli.Context) error { return err } - l := crawler.Logger - - l.Info("introspecting...\n") - - err = crawler.Introspect() - - if err != nil { - return err - } - err = crawler.Crawl() if err != nil { @@ -94,15 +84,15 @@ func RootCmd(c *cli.Context) error { defer crawler.Close() - streamer, err := NewEthereumStreamer() + // streamer, err := NewEthereumStreamer() - if err != nil { - return err - } + // if err != nil { + // return err + // } // for now use crawler's introspect and s3 client rather than recreating them - streamer.Glue = crawler.Glue - streamer.S3 = crawler.S3 + // streamer.Glue = crawler.Glue + // streamer.S3 = crawler.S3 // err = streamer.Stream() diff --git a/services/crawler/crawler.go b/services/crawler/crawler.go index d3315168a..844da52aa 100644 --- a/services/crawler/crawler.go +++ b/services/crawler/crawler.go @@ -182,111 +182,6 @@ func NewEthereumCrawler() (*EthereumCrawler, error) { }, nil } -func (c *EthereumCrawler) Introspect() error { - l := c.Logger - err := c.Glue.LoadDatabases() - - if err != nil { - return err - } - - err = c.Glue.LoadTables(AnalyticsDatabaseDev) - - if err != nil { - return err - } - - for _, t := range c.Glue.Tables { - tableVersion, err := strconv.Atoi(string([]rune(*t.Name)[len(*t.Name)-1])) - - if err != nil { - return err - } - - table := Table{ - Database: *t.DatabaseName, - Name: *t.Name, - Version: strconv.Itoa(tableVersion), - } - - resourceVersion, err := ResourceVersion() - - if err != nil { - return err - } - - // we expect table version to match resource version otherwise the resoure is not ready yet wait - if tableVersion != resourceVersion { - l.Error(fmt.Sprintf("database=%s %s table=%s resource_version=%s \n", AnalyticsDatabaseDev, table.String(), *t.Name, strconv.Itoa(resourceVersion))) - return errors.New("resource version does not match table version") - } - - if t.StorageDescriptor.Location != nil { - table.Bucket = *t.StorageDescriptor.Location - } - - if t.StorageDescriptor.SerdeInfo.Name == nil { - serde := t.StorageDescriptor.SerdeInfo.SerializationLibrary - table.SerDe = strings.Split(*serde, ".")[3] - } else { - table.SerDe = *t.StorageDescriptor.SerdeInfo.Name - } - - if strings.Contains(*t.Name, "event") { - c.txBucket = table.Bucket - } else if strings.Contains(*t.Name, "staking") { - c.stakingBucket = table.Bucket - } else if strings.Contains(*t.Name, "wallet") { - c.walletBucket = table.Bucket - } - } - - if strings.HasPrefix(c.txBucket, "s3://") { - c.txBucket = strings.TrimPrefix(c.txBucket, "s3://") - } - - if strings.HasPrefix(c.walletBucket, "s3://") { - c.walletBucket = strings.TrimPrefix(c.walletBucket, "s3://") - } - - return nil -} - -func ResourceVersion() (int, error) { - f, err := os.ReadFile("common/data/package.json") - - if err != nil { - return 0, err - } - - var pkg PkgJSON - - err = json.Unmarshal(f, &pkg) - - if err != nil { - return 0, err - } - - var major int - - semver := strings.Split(pkg.Version, ".") - - if len(semver) < 3 { - return 0, errors.New("invalid semver") - } - - major, err = strconv.Atoi(semver[0]) - - if err != nil { - return 0, err - } - - if major < 1 { - return 0, errors.New("major version must be greater than 0") - } - return major, nil -} - func (c *EthereumCrawler) Crawl() error { l := c.Logger @@ -416,9 +311,10 @@ func (c *EthereumCrawler) SaveWalletEvents(block int, wallet []*WalletEvent) err return err } - c.Logger.Info("uploaded %d wallet events to %s\n", len(wallet), dest) + c.Logger.Info("uploaded block %d to %s\n", block, dest) return nil } + func (c *EthereumCrawler) Close() { c.Wg.Wait() c.Elapsed = time.Since(c.Begin) @@ -440,7 +336,7 @@ func (c *EthereumCrawler) ProcessBlock(height int) ([]*Event, []*WalletEvent, er return nil, nil, err } - l.Info("processing_block=%d\n", block.Number().Int64()) + l.Info("processing block=%d\n", block.Number().Int64()) blockEvent, err := c.EventFromBlock(block) @@ -452,7 +348,7 @@ func (c *EthereumCrawler) ProcessBlock(height int) ([]*Event, []*WalletEvent, er if block.Transactions().Len() > 0 { for i, tx := range block.Transactions() { - l.Info("processing tx %d of %d\n", i+1, block.Transactions().Len()) + l.Info("processing tx %d of %d in block %d\n", i+1, block.Transactions().Len(), block.Number().Int64()) receipt, err := c.Client.TransactionReceipt(context.Background(), tx.Hash()) @@ -484,17 +380,17 @@ func (c *EthereumCrawler) EventsFromTransaction(b *types.Block, receipt *types.R Chain: Ethereum, Network: c.Network, Provider: Casimir, + Block: b.Hash().Hex(), Type: Transaction, Height: int64(b.Number().Uint64()), Transaction: tx.Hash().Hex(), - ReceivedAt: time.Unix(int64(b.Time()), 0).Format(AWSAthenaTimeFormat), + ReceivedAt: time.Unix(int64(b.Time()), 0).Format("2006-01-02 15:04:05.999999999"), } if tx.Value() != nil { txEvent.Amount = tx.Value().String() } - // gas fee = gas price * gas used txEvent.GasFee = new(big.Int).Mul(tx.GasPrice(), big.NewInt(int64(receipt.GasUsed))).String() if tx.To() != nil { @@ -570,11 +466,119 @@ func (c *EthereumCrawler) EventFromBlock(b *types.Block) (*Event, error) { Type: Block, Height: int64(b.Number().Uint64()), Block: b.Hash().Hex(), - ReceivedAt: time.Unix(int64(b.Time()), 0).Format(AWSAthenaTimeFormat), + ReceivedAt: time.Unix(int64(b.Time()), 0).Format("2006-01-02 15:04:05.999999999"), } return &event, nil } +func (c *EthereumCrawler) Introspect() error { + l := c.Logger + + l.Info("introspecting...\n") + + err := c.Glue.LoadDatabases() + + if err != nil { + return err + } + + err = c.Glue.LoadTables(AnalyticsDatabaseDev) + + if err != nil { + return err + } + + for _, t := range c.Glue.Tables { + tableVersion, err := strconv.Atoi(string([]rune(*t.Name)[len(*t.Name)-1])) + + if err != nil { + return err + } + + table := Table{ + Database: *t.DatabaseName, + Name: *t.Name, + Version: strconv.Itoa(tableVersion), + } + + resourceVersion, err := ResourceVersion() + + if err != nil { + return err + } + + // we expect table version to match resource version otherwise the resoure is not ready yet wait + if tableVersion != resourceVersion { + l.Error(fmt.Sprintf("database=%s %s table=%s resource_version=%s \n", AnalyticsDatabaseDev, table.String(), *t.Name, strconv.Itoa(resourceVersion))) + return errors.New("resource version does not match table version") + } + + if t.StorageDescriptor.Location != nil { + table.Bucket = *t.StorageDescriptor.Location + } + + if t.StorageDescriptor.SerdeInfo.Name == nil { + serde := t.StorageDescriptor.SerdeInfo.SerializationLibrary + table.SerDe = strings.Split(*serde, ".")[3] + } else { + table.SerDe = *t.StorageDescriptor.SerdeInfo.Name + } + + if strings.Contains(*t.Name, "event") { + c.txBucket = table.Bucket + } else if strings.Contains(*t.Name, "staking") { + c.stakingBucket = table.Bucket + } else if strings.Contains(*t.Name, "wallet") { + c.walletBucket = table.Bucket + } + } + + if strings.HasPrefix(c.txBucket, "s3://") { + c.txBucket = strings.TrimPrefix(c.txBucket, "s3://") + } + + if strings.HasPrefix(c.walletBucket, "s3://") { + c.walletBucket = strings.TrimPrefix(c.walletBucket, "s3://") + } + + return nil +} + +func ResourceVersion() (int, error) { + f, err := os.ReadFile("common/data/package.json") + + if err != nil { + return 0, err + } + + var pkg PkgJSON + + err = json.Unmarshal(f, &pkg) + + if err != nil { + return 0, err + } + + var major int + + semver := strings.Split(pkg.Version, ".") + + if len(semver) < 3 { + return 0, errors.New("invalid semver") + } + + major, err = strconv.Atoi(semver[0]) + + if err != nil { + return 0, err + } + + if major < 1 { + return 0, errors.New("major version must be greater than 0") + } + return major, nil +} + func (t Table) String() string { return fmt.Sprintf("table=%s version=%s database=%s bucket=%s serde=%s", t.Name, t.Version, t.Database, t.Bucket, t.SerDe) } diff --git a/services/crawler/ethereum_test.go b/services/crawler/ethereum_test.go index ecf63f80b..f6545039d 100644 --- a/services/crawler/ethereum_test.go +++ b/services/crawler/ethereum_test.go @@ -1,6 +1,7 @@ package main import ( + "context" "net/url" "os" "testing" @@ -27,7 +28,11 @@ func TestNewEtheruemClient(t *testing.T) { t.Fatal(err) } - if client == nil { - t.Fatal("client is nil") + head, err := client.Client.HeaderByNumber(context.Background(), nil) + + if err != nil { + t.Fatal(err) } + + t.Log(head.Number.String()) } diff --git a/services/crawler/exchange.go b/services/crawler/exchange.go index 57ccc9efd..7e5cdfb6b 100644 --- a/services/crawler/exchange.go +++ b/services/crawler/exchange.go @@ -231,9 +231,16 @@ func (c CryptoCompareExchange) HistoricalPrice(coin ChainType, currency Currency path := "index/cc/v1/historical/minutes" - // to_ts (timestamp) returns historical data before this unix timestamp - // use limit=2000 and keep going back in time using the to_ts param. - url := fmt.Sprintf("%s/%s?market=ccix&instruments=%s-%s&limit=20&to_ts=%d", c.BaseUrl, path, coin.Short(), currency.String(), received.Unix()) + limit := 20 + + fmt.Println(received) + // conver the time.Time to unix timestamp + unixTime := received.UTC().Unix() + + url := fmt.Sprintf("%s/%s?market=ccix&instrument=%s-%s&limit=%d&to_ts=%d", c.BaseUrl, path, coin.Short(), currency.String(), limit, unixTime) + + fmt.Println(unixTime) + fmt.Println(url) req, err := httpClient.Get(url) @@ -265,11 +272,10 @@ func (c ChainType) Short() string { switch c { case Ethereum: return "ETH" - case Iotex: - return "IOTX" - + case Bitcoin: + return "BTC" default: - panic("invalid chain type") + return "" } } diff --git a/services/crawler/exchange_test.go b/services/crawler/exchange_test.go index 368e988da..83f4307ea 100644 --- a/services/crawler/exchange_test.go +++ b/services/crawler/exchange_test.go @@ -1,6 +1,10 @@ package main import ( + "context" + "fmt" + "math/big" + "net/url" "os" "testing" ) @@ -20,9 +24,65 @@ func TestCurrentPrice(t *testing.T) { t.Error(err) } - _, err = exchange.CurrentPrice(Ethereum, USD) + price, err := exchange.CurrentPrice(Ethereum, USD) if err != nil { t.Error(err) } + + fmt.Println(price) +} + +func TestHistoricalPrice(t *testing.T) { + err := LoadEnv() + + if err != nil { + t.Error(err) + } + + raw := os.Getenv("ETHEREUM_RPC") + + fmt.Println(raw) + + url, err := url.Parse(raw) + + if err != nil { + t.Fatal(err) + } + + client, err := NewEthereumClient(Casimir, *url) + + fmt.Println(client) + + if err != nil { + t.Fatal(err) + } + + block, err := client.Client.BlockByNumber(context.Background(), big.NewInt(10000000)) + + if err != nil { + t.Error(err) + } + + fmt.Println(block.Number().String()) + + // if err != nil { + // t.Error(err) + // } + + // key := os.Getenv("CRYPTOCOMPARE_API_KEY") + + // exchange, err := NewCryptoCompareExchange(key) + + // if err != nil { + // t.Error(err) + // } + + // price, err := exchange.HistoricalPrice(Ethereum, USD, block.ReceivedAt) + + // if err != nil { + // t.Error(err) + // } + + // fmt.Println(price) } diff --git a/services/crawler/streamer.go b/services/crawler/streamer.go index 49b2a7ef7..dc0baf336 100644 --- a/services/crawler/streamer.go +++ b/services/crawler/streamer.go @@ -128,132 +128,159 @@ func (s *EthereumStreamer) Stream() error { case <-ctx.Done(): return nil case h := <-head: - s.Mutex.Lock() - defer s.Mutex.Unlock() - s.ProcessingBlock = h.Number.Uint64() - - _, err := s.ProcessBlock() + _, _, err := s.ProcessBlock(int(h.Number.Uint64())) if err != nil { l.Error(err.Error()) return err } - - // price, err := s.Exchange.CurrentPrice(Ethereum, USD) - - // if err != nil { - // l.Error(err.Error()) - // return err - // } } } } -func (s *EthereumStreamer) ProcessBlock() ([]*Event, error) { +func (s *EthereumStreamer) ProcessBlock(height int) ([]*Event, []*WalletEvent, error) { l := s.Logger - l.Info("streaming block=%d\n", s.ProcessingBlock) - - ctx := context.Background() + var events []*Event + var walletEvents []*WalletEvent - block, err := s.EthereumClient.Client.BlockByNumber(ctx, new(big.Int).SetUint64(s.ProcessingBlock)) + block, err := s.Client.BlockByNumber(context.Background(), big.NewInt(int64(height))) if err != nil { - return nil, err + return nil, nil, err } - var events []*Event + l.Info("processing block=%d\n", block.Number().Int64()) blockEvent, err := s.EventFromBlock(block) if err != nil { - return nil, err + return nil, nil, err } events = append(events, blockEvent) - // for _, tx := range block.Transactions() { - // txEvent, err := s.TransactionEvent(tx) + if block.Transactions().Len() > 0 { + for i, tx := range block.Transactions() { + l.Info("processing tx %d of %d in block %d\n", i+1, block.Transactions().Len(), block.Number().Int64()) - // if err != nil { - // return nil, err - // } + receipt, err := s.Client.TransactionReceipt(context.Background(), tx.Hash()) - // } - return events, nil -} + if err != nil { + return nil, nil, err + } -func (c *EthereumStreamer) EventFromBlock(b *types.Block) (*Event, error) { - event := Event{ - Chain: Ethereum, - Network: c.Network, - Provider: Casimir, - Type: Block, - Height: int64(b.Number().Uint64()), - } + txEvents, walletEvent, err := s.EventsFromTransaction(block, receipt) - if b.Hash().Hex() != "" { - event.Block = b.Hash().Hex() - } + if err != nil { + return nil, nil, err + } - if b.Time() != 0 { - event.ReceivedAt = time.Unix(int64(b.Time()), 0).Format("2006-01-02 15:04:05.999999999") + events = append(events, txEvents...) + walletEvents = append(walletEvents, walletEvent...) + } } - - return &event, nil + return events, walletEvents, nil } -func (c *EthereumStreamer) TransactionEvent(b *types.Block, tx *types.Receipt) ([]*Event, error) { - var events []*Event +func (s *EthereumStreamer) EventsFromTransaction(b *types.Block, receipt *types.Receipt) ([]*Event, []*WalletEvent, error) { + var txEvents []*Event + var walletEvents []*WalletEvent + + l := s.Logger for index, tx := range b.Transactions() { txEvent := Event{ - Chain: Ethereum, - Network: c.Network, - Provider: Casimir, - Type: Transaction, - Height: int64(b.Number().Uint64()), + Chain: Ethereum, + Network: s.Network, + Provider: Casimir, + Block: b.Hash().Hex(), + Type: Transaction, + Height: int64(b.Number().Uint64()), + Transaction: tx.Hash().Hex(), + ReceivedAt: time.Unix(int64(b.Time()), 0).Format("2006-01-02 15:04:05.999999999"), } - if tx.Hash().Hex() != "" { - txEvent.Transaction = tx.Hash().Hex() + if tx.Value() != nil { + txEvent.Amount = tx.Value().String() } - // recipient - if tx.To().Hex() != "" { - txEvent.Recipient = tx.To().Hex() + txEvent.GasFee = new(big.Int).Mul(tx.GasPrice(), big.NewInt(int64(receipt.GasUsed))).String() - // get receipt balance - recipientBalance, err := c.Client.BalanceAt(context.Background(), *tx.To(), b.Number()) + if tx.To() != nil { + txEvent.Recipient = tx.To().Hex() + recipeintBalance, err := s.Client.BalanceAt(context.Background(), *tx.To(), b.Number()) if err != nil { - return nil, err + return nil, nil, err } - txEvent.RecipientBalance = recipientBalance.String() - } - // amount - if tx.Value().String() != "" { - txEvent.Amount = tx.Value().String() + txEvent.RecipientBalance = recipeintBalance.String() } - sender, err := c.Client.TransactionSender(context.Background(), tx, b.Hash(), uint(index)) + sender, err := s.Client.TransactionSender(context.Background(), tx, b.Hash(), uint(index)) if err != nil { - return nil, err + return nil, nil, err } - // sender if sender.Hex() != "" { txEvent.Sender = sender.Hex() - senderBalance, err := c.Client.BalanceAt(context.Background(), sender, b.Number()) + senderBalance, err := s.Client.BalanceAt(context.Background(), sender, b.Number()) if err != nil { - return nil, err + return nil, nil, err } + txEvent.SenderBalance = senderBalance.String() } + + txEvents = append(txEvents, &txEvent) + + senderWalletEvent := WalletEvent{ + WalletAddress: txEvent.Sender, + Balance: txEvent.SenderBalance, + Direction: Outgoing, + TxId: txEvent.Transaction, + ReceivedAt: txEvent.ReceivedAt, + Amount: txEvent.Amount, + Price: txEvent.Price, + GasFee: txEvent.GasFee, + } + + walletEvents = append(walletEvents, &senderWalletEvent) + + receiptWalletEvent := WalletEvent{ + WalletAddress: txEvent.Recipient, + Balance: txEvent.RecipientBalance, + Direction: Incoming, + TxId: txEvent.Transaction, + ReceivedAt: txEvent.ReceivedAt, + Amount: txEvent.Amount, + Price: txEvent.Price, + GasFee: txEvent.GasFee, + } + walletEvents = append(walletEvents, &receiptWalletEvent) + // TODO: handle contract events (staking action) + } + + if len(walletEvents) == 0 || len(walletEvents) != len(txEvents)*2 { + l.Error("wallet events and tx events mismatch, wallet events=%d tx events=%d", len(walletEvents), len(txEvents)) + } + + return txEvents, walletEvents, nil +} + +func (s *EthereumStreamer) EventFromBlock(b *types.Block) (*Event, error) { + event := Event{ + Chain: Ethereum, + Network: s.Network, + Provider: Casimir, + Type: Block, + Height: int64(b.Number().Uint64()), + Block: b.Hash().Hex(), + ReceivedAt: time.Unix(int64(b.Time()), 0).Format("2006-01-02 15:04:05.999999999"), } - return events, nil + return &event, nil }