From 99749ecb68fc94d112b2edf77d65126cedc140cb Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Tue, 14 Feb 2023 16:26:37 +0100 Subject: [PATCH 1/3] cmd/devp2p: parallelised crawling --- cmd/devp2p/crawl.go | 57 +++++++++++++++++++++++++++++---------------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/cmd/devp2p/crawl.go b/cmd/devp2p/crawl.go index 0d8127684e66..9f0cb50dbed0 100644 --- a/cmd/devp2p/crawl.go +++ b/cmd/devp2p/crawl.go @@ -21,6 +21,8 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" + "sync" + "sync/atomic" ) type crawler struct { @@ -34,6 +36,7 @@ type crawler struct { // settings revalidateInterval time.Duration + mu sync.RWMutex } const ( @@ -82,28 +85,35 @@ func (c *crawler) run(timeout time.Duration) nodeSet { } var ( - added int - updated int - skipped int - recent int - removed int + added uint64 + updated uint64 + skipped uint64 + recent uint64 + removed uint64 + nthreads = 16 ) + for i := 0; i < nthreads; i++ { + go func() { + for n := range c.ch { + switch c.updateNode(n) { + case nodeSkipIncompat: + atomic.AddUint64(&skipped, 1) + case nodeSkipRecent: + atomic.AddUint64(&recent, 1) + case nodeRemoved: + atomic.AddUint64(&removed, 1) + case nodeAdded: + atomic.AddUint64(&added, 1) + default: + atomic.AddUint64(&updated, 1) + } + } + }() + } + loop: for { select { - case n := <-c.ch: - switch c.updateNode(n) { - case nodeSkipIncompat: - skipped++ - case nodeSkipRecent: - recent++ - case nodeRemoved: - removed++ - case nodeAdded: - added++ - default: - updated++ - } case it := <-doneCh: if it == c.inputIter { // Enable timeout when we're done revalidating the input nodes. @@ -119,8 +129,11 @@ loop: break loop case <-statusTicker.C: log.Info("Crawling in progress", - "added", added, "updated", updated, "removed", removed, - "ignored(recent)", recent, "ignored(incompatible)", skipped) + "added", atomic.LoadUint64(&added), + "updated", atomic.LoadUint64(&updated), + "removed", atomic.LoadUint64(&removed), + "ignored(recent)", atomic.LoadUint64(&removed), + "ignored(incompatible)", atomic.LoadUint64(&skipped)) } } @@ -148,7 +161,9 @@ func (c *crawler) runIterator(done chan<- enode.Iterator, it enode.Iterator) { // updateNode updates the info about the given node, and returns a status // about what changed func (c *crawler) updateNode(n *enode.Node) int { + c.mu.RLock() node, ok := c.output[n.ID()] + c.mu.RUnlock() // Skip validation of recently-seen nodes. if ok && time.Since(node.LastCheck) < c.revalidateInterval { @@ -178,6 +193,8 @@ func (c *crawler) updateNode(n *enode.Node) int { } // Store/update node in output set. + c.mu.Lock() + defer c.mu.Unlock() if node.Score <= 0 { log.Debug("Removing node", "id", n.ID()) delete(c.output, n.ID()) From 05db4554c1ef6b17b53d86598dcd6647cc96dc6a Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Wed, 15 Feb 2023 09:58:22 +0100 Subject: [PATCH 2/3] cmd/devp2p: fix parallel crawling + lower reporting verbosity --- cmd/devp2p/crawl.go | 61 +++++++++++++++++++++--------------- cmd/devp2p/discv4cmd.go | 11 +++++-- cmd/devp2p/discv5cmd.go | 2 +- cmd/devp2p/dns_cloudflare.go | 19 ++++++++--- cmd/devp2p/dns_route53.go | 5 +-- 5 files changed, 62 insertions(+), 36 deletions(-) diff --git a/cmd/devp2p/crawl.go b/cmd/devp2p/crawl.go index 9f0cb50dbed0..cdb42e4f3f65 100644 --- a/cmd/devp2p/crawl.go +++ b/cmd/devp2p/crawl.go @@ -17,12 +17,13 @@ package main import ( + "fmt" + "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" - "sync" - "sync/atomic" ) type crawler struct { @@ -70,7 +71,7 @@ func newCrawler(input nodeSet, disc resolver, iters ...enode.Iterator) *crawler return c } -func (c *crawler) run(timeout time.Duration) nodeSet { +func (c *crawler) run(timeout time.Duration, nthreads int) nodeSet { var ( timeoutTimer = time.NewTimer(timeout) timeoutCh <-chan time.Time @@ -78,34 +79,43 @@ func (c *crawler) run(timeout time.Duration) nodeSet { doneCh = make(chan enode.Iterator, len(c.iters)) liveIters = len(c.iters) ) + if nthreads < 1 { + nthreads = 1 + } defer timeoutTimer.Stop() defer statusTicker.Stop() for _, it := range c.iters { go c.runIterator(doneCh, it) } - var ( - added uint64 - updated uint64 - skipped uint64 - recent uint64 - removed uint64 - nthreads = 16 + added uint64 + updated uint64 + skipped uint64 + recent uint64 + removed uint64 + wg sync.WaitGroup ) + wg.Add(nthreads) for i := 0; i < nthreads; i++ { go func() { - for n := range c.ch { - switch c.updateNode(n) { - case nodeSkipIncompat: - atomic.AddUint64(&skipped, 1) - case nodeSkipRecent: - atomic.AddUint64(&recent, 1) - case nodeRemoved: - atomic.AddUint64(&removed, 1) - case nodeAdded: - atomic.AddUint64(&added, 1) - default: - atomic.AddUint64(&updated, 1) + defer wg.Done() + for { + select { + case n := <-c.ch: + switch c.updateNode(n) { + case nodeSkipIncompat: + atomic.AddUint64(&skipped, 1) + case nodeSkipRecent: + atomic.AddUint64(&recent, 1) + case nodeRemoved: + atomic.AddUint64(&removed, 1) + case nodeAdded: + atomic.AddUint64(&added, 1) + default: + atomic.AddUint64(&updated, 1) + } + case <-c.closed: + return } } }() @@ -144,6 +154,7 @@ loop: for ; liveIters > 0; liveIters-- { <-doneCh } + wg.Wait() return c.output } @@ -171,10 +182,9 @@ func (c *crawler) updateNode(n *enode.Node) int { } // Request the node record. - nn, err := c.disc.RequestENR(n) - node.LastCheck = truncNow() status := nodeUpdated - if err != nil { + node.LastCheck = truncNow() + if nn, err := c.disc.RequestENR(n); err != nil { if node.Score == 0 { // Node doesn't implement EIP-868. log.Debug("Skipping node", "id", n.ID()) @@ -191,7 +201,6 @@ func (c *crawler) updateNode(n *enode.Node) int { } node.LastResponse = node.LastCheck } - // Store/update node in output set. c.mu.Lock() defer c.mu.Unlock() diff --git a/cmd/devp2p/discv4cmd.go b/cmd/devp2p/discv4cmd.go index 94e61c36f325..63122634780d 100644 --- a/cmd/devp2p/discv4cmd.go +++ b/cmd/devp2p/discv4cmd.go @@ -78,7 +78,7 @@ var ( Name: "crawl", Usage: "Updates a nodes.json file with random nodes found in the DHT", Action: discv4Crawl, - Flags: flags.Merge(discoveryNodeFlags, []cli.Flag{crawlTimeoutFlag}), + Flags: flags.Merge(discoveryNodeFlags, []cli.Flag{crawlTimeoutFlag, crawlParallelismFlag}), } discv4TestCommand = &cli.Command{ Name: "test", @@ -120,6 +120,11 @@ var ( Usage: "Time limit for the crawl.", Value: 30 * time.Minute, } + crawlParallelismFlag = &cli.IntFlag{ + Name: "parallel", + Usage: "How many parallel discoveries to attempt.", + Value: 16, + } remoteEnodeFlag = &cli.StringFlag{ Name: "remote", Usage: "Enode of the remote node under test", @@ -195,7 +200,7 @@ func discv4ResolveJSON(ctx *cli.Context) error { defer disc.Close() c := newCrawler(inputSet, disc, enode.IterNodes(nodeargs)) c.revalidateInterval = 0 - output := c.run(0) + output := c.run(0, 1) writeNodesJSON(nodesFile, output) return nil } @@ -214,7 +219,7 @@ func discv4Crawl(ctx *cli.Context) error { defer disc.Close() c := newCrawler(inputSet, disc, disc.RandomNodes()) c.revalidateInterval = 10 * time.Minute - output := c.run(ctx.Duration(crawlTimeoutFlag.Name)) + output := c.run(ctx.Duration(crawlTimeoutFlag.Name), ctx.Int(crawlParallelismFlag.Name)) writeNodesJSON(nodesFile, output) return nil } diff --git a/cmd/devp2p/discv5cmd.go b/cmd/devp2p/discv5cmd.go index 343e2a0d5d42..832a4bc1f426 100644 --- a/cmd/devp2p/discv5cmd.go +++ b/cmd/devp2p/discv5cmd.go @@ -110,7 +110,7 @@ func discv5Crawl(ctx *cli.Context) error { defer disc.Close() c := newCrawler(inputSet, disc, disc.RandomNodes()) c.revalidateInterval = 10 * time.Minute - output := c.run(ctx.Duration(crawlTimeoutFlag.Name)) + output := c.run(ctx.Duration(crawlTimeoutFlag.Name), ctx.Int(crawlParallelismFlag.Name)) writeNodesJSON(nodesFile, output) return nil } diff --git a/cmd/devp2p/dns_cloudflare.go b/cmd/devp2p/dns_cloudflare.go index 92c6faf272ec..bfe92257ee5a 100644 --- a/cmd/devp2p/dns_cloudflare.go +++ b/cmd/devp2p/dns_cloudflare.go @@ -126,11 +126,16 @@ func (c *cloudflareClient) uploadRecords(name string, records map[string]string) } // Iterate over the new records and inject anything missing. + log.Info("Updating DNS entries") + created := 0 + updated := 0 + skipped := 0 for path, val := range records { old, exists := existing[path] if !exists { // Entry is unknown, push a new one to Cloudflare. - log.Info(fmt.Sprintf("Creating %s = %q", path, val)) + log.Debug(fmt.Sprintf("Creating %s = %q", path, val)) + created++ ttl := rootTTL if path != name { ttl = treeNodeTTLCloudflare // Max TTL permitted by Cloudflare @@ -139,27 +144,33 @@ func (c *cloudflareClient) uploadRecords(name string, records map[string]string) _, err = c.CreateDNSRecord(context.Background(), c.zoneID, record) } else if old.Content != val { // Entry already exists, only change its content. - log.Info(fmt.Sprintf("Updating %s from %q to %q", path, old.Content, val)) + log.Debug(fmt.Sprintf("Updating %s from %q to %q", path, old.Content, val)) + updated++ old.Content = val err = c.UpdateDNSRecord(context.Background(), c.zoneID, old.ID, old) } else { + skipped++ log.Debug(fmt.Sprintf("Skipping %s = %q", path, val)) } if err != nil { return fmt.Errorf("failed to publish %s: %v", path, err) } } - + log.Info("Updated DNS entries", "new", created, "updated", updated, "untouched", skipped) // Iterate over the old records and delete anything stale. + deleted := 0 + log.Info("Deleting stale DNS entries") for path, entry := range existing { if _, ok := records[path]; ok { continue } // Stale entry, nuke it. - log.Info(fmt.Sprintf("Deleting %s = %q", path, entry.Content)) + log.Debug(fmt.Sprintf("Deleting %s = %q", path, entry.Content)) + deleted++ if err := c.DeleteDNSRecord(context.Background(), c.zoneID, entry.ID); err != nil { return fmt.Errorf("failed to delete %s: %v", path, err) } } + log.Info("Deleted stale DNS entries", "count", deleted) return nil } diff --git a/cmd/devp2p/dns_route53.go b/cmd/devp2p/dns_route53.go index 4aab0856ff90..81734eb2ad87 100644 --- a/cmd/devp2p/dns_route53.go +++ b/cmd/devp2p/dns_route53.go @@ -329,8 +329,9 @@ func (c *route53Client) collectRecords(name string) (map[string]recordSet, error var req route53.ListResourceRecordSetsInput req.HostedZoneId = &c.zoneID existing := make(map[string]recordSet) + log.Info("Loading existing TXT records", "name", name, "zone", c.zoneID) for page := 0; ; page++ { - log.Info("Loading existing TXT records", "name", name, "zone", c.zoneID, "page", page) + log.Debug("Loading existing TXT records", "name", name, "zone", c.zoneID, "page", page) resp, err := c.api.ListResourceRecordSets(context.TODO(), &req) if err != nil { return existing, err @@ -360,7 +361,7 @@ func (c *route53Client) collectRecords(name string) (map[string]recordSet, error req.StartRecordName = resp.NextRecordName req.StartRecordType = resp.NextRecordType } - + log.Info("Loaded existing TXT records", "name", name, "zone", c.zoneID, "records", len(existing)) return existing, nil } From 8302031828ac81bd40ac7ac43603741998dd9ef6 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Wed, 15 Feb 2023 10:07:03 +0100 Subject: [PATCH 3/3] f --- cmd/devp2p/crawl.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/devp2p/crawl.go b/cmd/devp2p/crawl.go index cdb42e4f3f65..8c9755ac1cd4 100644 --- a/cmd/devp2p/crawl.go +++ b/cmd/devp2p/crawl.go @@ -17,7 +17,6 @@ package main import ( - "fmt" "sync" "sync/atomic" "time"