Skip to content

Commit

Permalink
Fix streamer
Browse files Browse the repository at this point in the history
  • Loading branch information
hawyar committed Jun 29, 2023
1 parent 8319d39 commit 6fb214f
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 206 deletions.
22 changes: 6 additions & 16 deletions services/crawler/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,6 @@ func RootCmd(c *cli.Context) error {
return err
}

l := crawler.Logger

l.Info("introspecting...\n")

err = crawler.Introspect()

if err != nil {
return err
}

err = crawler.Crawl()

if err != nil {
Expand All @@ -94,15 +84,15 @@ func RootCmd(c *cli.Context) error {

defer crawler.Close()

streamer, err := NewEthereumStreamer()
// streamer, err := NewEthereumStreamer()

if err != nil {
return err
}
// if err != nil {
// return err
// }

// for now use crawler's introspect and s3 client rather than recreating them
streamer.Glue = crawler.Glue
streamer.S3 = crawler.S3
// streamer.Glue = crawler.Glue
// streamer.S3 = crawler.S3

// err = streamer.Stream()

Expand Down
226 changes: 115 additions & 111 deletions services/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,111 +182,6 @@ func NewEthereumCrawler() (*EthereumCrawler, error) {
}, nil
}

func (c *EthereumCrawler) Introspect() error {
l := c.Logger
err := c.Glue.LoadDatabases()

if err != nil {
return err
}

err = c.Glue.LoadTables(AnalyticsDatabaseDev)

if err != nil {
return err
}

for _, t := range c.Glue.Tables {
tableVersion, err := strconv.Atoi(string([]rune(*t.Name)[len(*t.Name)-1]))

if err != nil {
return err
}

table := Table{
Database: *t.DatabaseName,
Name: *t.Name,
Version: strconv.Itoa(tableVersion),
}

resourceVersion, err := ResourceVersion()

if err != nil {
return err
}

// we expect table version to match resource version otherwise the resoure is not ready yet wait
if tableVersion != resourceVersion {
l.Error(fmt.Sprintf("database=%s %s table=%s resource_version=%s \n", AnalyticsDatabaseDev, table.String(), *t.Name, strconv.Itoa(resourceVersion)))
return errors.New("resource version does not match table version")
}

if t.StorageDescriptor.Location != nil {
table.Bucket = *t.StorageDescriptor.Location
}

if t.StorageDescriptor.SerdeInfo.Name == nil {
serde := t.StorageDescriptor.SerdeInfo.SerializationLibrary
table.SerDe = strings.Split(*serde, ".")[3]
} else {
table.SerDe = *t.StorageDescriptor.SerdeInfo.Name
}

if strings.Contains(*t.Name, "event") {
c.txBucket = table.Bucket
} else if strings.Contains(*t.Name, "staking") {
c.stakingBucket = table.Bucket
} else if strings.Contains(*t.Name, "wallet") {
c.walletBucket = table.Bucket
}
}

if strings.HasPrefix(c.txBucket, "s3://") {
c.txBucket = strings.TrimPrefix(c.txBucket, "s3://")
}

if strings.HasPrefix(c.walletBucket, "s3://") {
c.walletBucket = strings.TrimPrefix(c.walletBucket, "s3://")
}

return nil
}

func ResourceVersion() (int, error) {
f, err := os.ReadFile("common/data/package.json")

if err != nil {
return 0, err
}

var pkg PkgJSON

err = json.Unmarshal(f, &pkg)

if err != nil {
return 0, err
}

var major int

semver := strings.Split(pkg.Version, ".")

if len(semver) < 3 {
return 0, errors.New("invalid semver")
}

major, err = strconv.Atoi(semver[0])

if err != nil {
return 0, err
}

if major < 1 {
return 0, errors.New("major version must be greater than 0")
}
return major, nil
}

func (c *EthereumCrawler) Crawl() error {
l := c.Logger

Expand Down Expand Up @@ -416,9 +311,10 @@ func (c *EthereumCrawler) SaveWalletEvents(block int, wallet []*WalletEvent) err
return err
}

c.Logger.Info("uploaded %d wallet events to %s\n", len(wallet), dest)
c.Logger.Info("uploaded block %d to %s\n", block, dest)
return nil
}

func (c *EthereumCrawler) Close() {
c.Wg.Wait()
c.Elapsed = time.Since(c.Begin)
Expand All @@ -440,7 +336,7 @@ func (c *EthereumCrawler) ProcessBlock(height int) ([]*Event, []*WalletEvent, er
return nil, nil, err
}

l.Info("processing_block=%d\n", block.Number().Int64())
l.Info("processing block=%d\n", block.Number().Int64())

blockEvent, err := c.EventFromBlock(block)

Expand All @@ -452,7 +348,7 @@ func (c *EthereumCrawler) ProcessBlock(height int) ([]*Event, []*WalletEvent, er

if block.Transactions().Len() > 0 {
for i, tx := range block.Transactions() {
l.Info("processing tx %d of %d\n", i+1, block.Transactions().Len())
l.Info("processing tx %d of %d in block %d\n", i+1, block.Transactions().Len(), block.Number().Int64())

receipt, err := c.Client.TransactionReceipt(context.Background(), tx.Hash())

Expand Down Expand Up @@ -484,17 +380,17 @@ func (c *EthereumCrawler) EventsFromTransaction(b *types.Block, receipt *types.R
Chain: Ethereum,
Network: c.Network,
Provider: Casimir,
Block: b.Hash().Hex(),
Type: Transaction,
Height: int64(b.Number().Uint64()),
Transaction: tx.Hash().Hex(),
ReceivedAt: time.Unix(int64(b.Time()), 0).Format(AWSAthenaTimeFormat),
ReceivedAt: time.Unix(int64(b.Time()), 0).Format("2006-01-02 15:04:05.999999999"),
}

if tx.Value() != nil {
txEvent.Amount = tx.Value().String()
}

// gas fee = gas price * gas used
txEvent.GasFee = new(big.Int).Mul(tx.GasPrice(), big.NewInt(int64(receipt.GasUsed))).String()

if tx.To() != nil {
Expand Down Expand Up @@ -570,11 +466,119 @@ func (c *EthereumCrawler) EventFromBlock(b *types.Block) (*Event, error) {
Type: Block,
Height: int64(b.Number().Uint64()),
Block: b.Hash().Hex(),
ReceivedAt: time.Unix(int64(b.Time()), 0).Format(AWSAthenaTimeFormat),
ReceivedAt: time.Unix(int64(b.Time()), 0).Format("2006-01-02 15:04:05.999999999"),
}
return &event, nil
}

func (c *EthereumCrawler) Introspect() error {
l := c.Logger

l.Info("introspecting...\n")

err := c.Glue.LoadDatabases()

if err != nil {
return err
}

err = c.Glue.LoadTables(AnalyticsDatabaseDev)

if err != nil {
return err
}

for _, t := range c.Glue.Tables {
tableVersion, err := strconv.Atoi(string([]rune(*t.Name)[len(*t.Name)-1]))

if err != nil {
return err
}

table := Table{
Database: *t.DatabaseName,
Name: *t.Name,
Version: strconv.Itoa(tableVersion),
}

resourceVersion, err := ResourceVersion()

if err != nil {
return err
}

// we expect table version to match resource version otherwise the resoure is not ready yet wait
if tableVersion != resourceVersion {
l.Error(fmt.Sprintf("database=%s %s table=%s resource_version=%s \n", AnalyticsDatabaseDev, table.String(), *t.Name, strconv.Itoa(resourceVersion)))
return errors.New("resource version does not match table version")
}

if t.StorageDescriptor.Location != nil {
table.Bucket = *t.StorageDescriptor.Location
}

if t.StorageDescriptor.SerdeInfo.Name == nil {
serde := t.StorageDescriptor.SerdeInfo.SerializationLibrary
table.SerDe = strings.Split(*serde, ".")[3]
} else {
table.SerDe = *t.StorageDescriptor.SerdeInfo.Name
}

if strings.Contains(*t.Name, "event") {
c.txBucket = table.Bucket
} else if strings.Contains(*t.Name, "staking") {
c.stakingBucket = table.Bucket
} else if strings.Contains(*t.Name, "wallet") {
c.walletBucket = table.Bucket
}
}

if strings.HasPrefix(c.txBucket, "s3://") {
c.txBucket = strings.TrimPrefix(c.txBucket, "s3://")
}

if strings.HasPrefix(c.walletBucket, "s3://") {
c.walletBucket = strings.TrimPrefix(c.walletBucket, "s3://")
}

return nil
}

func ResourceVersion() (int, error) {
f, err := os.ReadFile("common/data/package.json")

if err != nil {
return 0, err
}

var pkg PkgJSON

err = json.Unmarshal(f, &pkg)

if err != nil {
return 0, err
}

var major int

semver := strings.Split(pkg.Version, ".")

if len(semver) < 3 {
return 0, errors.New("invalid semver")
}

major, err = strconv.Atoi(semver[0])

if err != nil {
return 0, err
}

if major < 1 {
return 0, errors.New("major version must be greater than 0")
}
return major, nil
}

func (t Table) String() string {
return fmt.Sprintf("table=%s version=%s database=%s bucket=%s serde=%s", t.Name, t.Version, t.Database, t.Bucket, t.SerDe)
}
9 changes: 7 additions & 2 deletions services/crawler/ethereum_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"net/url"
"os"
"testing"
Expand All @@ -27,7 +28,11 @@ func TestNewEtheruemClient(t *testing.T) {
t.Fatal(err)
}

if client == nil {
t.Fatal("client is nil")
head, err := client.Client.HeaderByNumber(context.Background(), nil)

if err != nil {
t.Fatal(err)
}

t.Log(head.Number.String())
}
20 changes: 13 additions & 7 deletions services/crawler/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,16 @@ func (c CryptoCompareExchange) HistoricalPrice(coin ChainType, currency Currency

path := "index/cc/v1/historical/minutes"

// to_ts (timestamp) returns historical data before this unix timestamp
// use limit=2000 and keep going back in time using the to_ts param.
url := fmt.Sprintf("%s/%s?market=ccix&instruments=%s-%s&limit=20&to_ts=%d", c.BaseUrl, path, coin.Short(), currency.String(), received.Unix())
limit := 20

fmt.Println(received)
// conver the time.Time to unix timestamp
unixTime := received.UTC().Unix()

url := fmt.Sprintf("%s/%s?market=ccix&instrument=%s-%s&limit=%d&to_ts=%d", c.BaseUrl, path, coin.Short(), currency.String(), limit, unixTime)

fmt.Println(unixTime)
fmt.Println(url)

req, err := httpClient.Get(url)

Expand Down Expand Up @@ -265,11 +272,10 @@ func (c ChainType) Short() string {
switch c {
case Ethereum:
return "ETH"
case Iotex:
return "IOTX"

case Bitcoin:
return "BTC"
default:
panic("invalid chain type")
return ""
}
}

Expand Down
Loading

0 comments on commit 6fb214f

Please sign in to comment.