Skip to content

Commit

Permalink
Add exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
hawyar committed Jun 26, 2023
1 parent faa64cf commit 19cba3e
Show file tree
Hide file tree
Showing 4 changed files with 345 additions and 104 deletions.
46 changes: 12 additions & 34 deletions services/crawler/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,43 +70,21 @@ func RootCmd(c *cli.Context) error {

defer crawler.Close()

return nil

// streamer, err := NewEtheruemStreamer()
streamer, err := NewEtheruemStreamer()

// if err != nil {
// return err
// }
if err != nil {
return err
}

// for now use crawler's introspect and s3 client rather than recreating them
// streamer.Glue = crawler.Glue
// streamer.S3 = crawler.S3

// err = streamer.Stream()

// if err != nil {
// return err
// }
}

// func RootCmd(c *cli.Context) error {
// crawler, err := NewCrawler()
streamer.Glue = crawler.Glue
streamer.S3 = crawler.S3

// if err != nil {
// return err
// }
err = streamer.Stream()

// _, err = crawler.Introspect()

// if err != nil {
// return err
// }

// err = crawler.Crawl()

// if err != nil {
// return err
// }
if err != nil {
return err
}

// return nil
// }
return nil
}
93 changes: 23 additions & 70 deletions services/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (c *EtheruemCrawler) Crawl() error {

l.Info("crawling %d blocks...\n", c.Head+1)

step := 500000
step := 250000

for i := int(c.Head); i >= 0; i -= step {
start := i
Expand All @@ -185,80 +185,33 @@ func (c *EtheruemCrawler) Crawl() error {
end = 0
}

go func(start, end int) {
defer func() {
c.Wg.Done()
<-c.Sema
}()
l.Info("batch=%d start=%d end=%d\n", i/step, start, end)

l.Info("batch=%d start=%d end=%d\n", i/step, start, end)
// go func(start, end int) {
// defer func() {
// c.Wg.Done()
// <-c.Sema
// }()

for j := start; j >= end; j-- {
events, err := c.ProcessBlock(int(j))
// l.Info("batch=%d start=%d end=%d\n", i/step, start, end)

if err != nil {
l.Error("error processing block=%d err=%s\n", j, err)
}
// for j := start; j >= end; j-- {
// events, err := c.ProcessBlock(int(j))

l.Info("captured %d events", len(events))
// ndjson, err := EncodeToNDJSONBytes(events)
// if err != nil {
// l.Error("error processing block=%d err=%s\n", j, err)
// }

// if err != nil {
// l.Error("error encoding events to ndjson err=%s\n", err)
// }
}
}(start, end)
// l.Info("captured %d events", len(events))
// // ndjson, err := EncodeToNDJSONBytes(events)

// // if err != nil {
// // l.Error("error encoding events to ndjson err=%s\n", err)
// // }
// }
// }(start, end)
}
return nil

// go func(start, end uint64) {
// defer func() {
// <-c.Sema
// c.Wg.Done()
// }()
// l.Info("batch=%d start=%d end=%d\n", i/step, start, end)

// for j := start; j <= end; j++ {
// events, err := c.ProcessBlock(int(j))

// if err != nil {
// l.Error("error processing block=%d err=%s\n", j, err)
// continue
// }

// ndjson, err := EncodeToNDJSONBytes(events)

// if err != nil {
// l.Error("error encoding events to ndjson err=%s\n", err)
// continue
// }

// // bucketPath := fmt.Sprintf("blocks/%d.ndjson", j)
// // tables := c.Glue.Tables
// // err = c.S3.UploadBytes(s)

// // save to local file
// fpath := fmt.Sprintf("data/%d-%d.ndjson", start, end)

// f, err := os.OpenFile(fpath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)

// if err != nil {
// l.Error("error opening file=%s err=%s\n", fpath, err)
// continue
// }

// _, err = f.Write(ndjson)

// if err != nil {
// l.Error("error writing to file=%s err=%s\n", fpath, err)
// continue
// }

// c.Mutex.Lock()
// c.EventsConsumed++
// c.Mutex.Unlock()
// }
// }(start, end)
}

func (c *EtheruemCrawler) Introspect() (map[string]Table, error) {
Expand Down Expand Up @@ -313,11 +266,11 @@ func (c *EtheruemCrawler) Introspect() (map[string]Table, error) {
table.SerDe = *t.StorageDescriptor.SerdeInfo.Name
}

if strings.Contains(*t.Name, "_event_") {
if strings.Contains(*t.Name, "event") {
tables["events"] = table
} else if strings.Contains(*t.Name, "staking_action") {
tables["staking"] = table
} else if strings.Contains(*t.Name, "_wallet_") {
} else if strings.Contains(*t.Name, "wallet") {
tables["wallet"] = table
}
}
Expand Down
Loading

0 comments on commit 19cba3e

Please sign in to comment.