Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/devp2p: use AWS-SDK v2 #22360

Merged
merged 7 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 87 additions & 49 deletions cmd/devp2p/dns_route53.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@
package main

import (
"context"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/route53"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/route53"
"github.com/aws/aws-sdk-go-v2/service/route53/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/dnsdisc"
"gopkg.in/urfave/cli.v1"
Expand All @@ -38,6 +41,7 @@ const (
// https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/DNSLimitations.html#limits-api-requests-changeresourcerecordsets
route53ChangeSizeLimit = 32000
route53ChangeCountLimit = 1000
maxRetryLimit = 60
)

var (
Expand All @@ -58,7 +62,7 @@ var (
)

type route53Client struct {
api *route53.Route53
api *route53.Client
zoneID string
}

Expand All @@ -74,13 +78,13 @@ func newRoute53Client(ctx *cli.Context) *route53Client {
if akey == "" || asec == "" {
exit(fmt.Errorf("need Route53 Access Key ID and secret proceed"))
}
config := &aws.Config{Credentials: credentials.NewStaticCredentials(akey, asec, "")}
session, err := session.NewSession(config)
creds := aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(akey, asec, ""))
cfg, err := config.LoadDefaultConfig(context.Background(), config.WithCredentialsProvider(creds))
if err != nil {
exit(fmt.Errorf("can't create AWS session: %v", err))
exit(fmt.Errorf("can't initialize AWS configuration: %v", err))
}
return &route53Client{
api: route53.New(session),
api: route53.NewFromConfig(cfg),
zoneID: ctx.String(route53ZoneIDFlag.Name),
}
}
Expand All @@ -105,25 +109,43 @@ func (c *route53Client) deploy(name string, t *dnsdisc.Tree) error {
return nil
}

// Submit change batches.
// Submit all change batches.
batches := splitChanges(changes, route53ChangeSizeLimit, route53ChangeCountLimit)
changesToCheck := make([]*route53.ChangeResourceRecordSetsOutput, len(batches))
for i, changes := range batches {
Copy link
Contributor Author

@qhenkart qhenkart Feb 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this loop can be optimized, mostly because the waiter is blocking (this is the case in the current version as well as in this PR).

Currently for every batch, it makes the change and then waits up to 30 minutes for the change to propagate before making the next change for the next batch. In a worse case scenario, this loop could block a command for hours.

My proposal is that either

  1. we loop through the batch and make all of the changes, then block the thread until all changes are made. This way the bottleneck only exists on the AWS side of how fast changes are propagated

  2. create a go-routine for each batch. This way each routine is responsible for making the change and then waiting for the propagation. The thread would lock until all of the routines have broken out of the wait loop. Like option 1, the bottleneck would only exist on the AWS side.

option 2 would be more elegant, both options would be a dramatic optimization. However I am not clear if this is necessary or desired. If it is I can include those changes in this PR or a different one (I can also create the optimization with the v1 sdk)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to make this change, I prefer option 1. There can be a lot of changes, and starting a goroutine for each has its own problems.

Copy link
Contributor Author

@qhenkart qhenkart Feb 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjl I have implemented option 1. Propagation takes an average of 60 seconds, according to AWS, so now instead of waiting ~ 60-90 seconds per change, ideally it would take 60-90 seconds for the first change and then the rest of the waiters should be successful on the first request

log.Info(fmt.Sprintf("Submitting %d changes to Route53", len(changes)))
batch := new(route53.ChangeBatch)
batch.SetChanges(changes)
batch.SetComment(fmt.Sprintf("enrtree update %d/%d of %s at seq %d", i+1, len(batches), name, t.Seq()))
batch := &types.ChangeBatch{
Changes: changes,
Comment: aws.String(fmt.Sprintf("enrtree update %d/%d of %s at seq %d", i+1, len(batches), name, t.Seq())),
}
req := &route53.ChangeResourceRecordSetsInput{HostedZoneId: &c.zoneID, ChangeBatch: batch}
resp, err := c.api.ChangeResourceRecordSets(req)
changesToCheck[i], err = c.api.ChangeResourceRecordSets(context.TODO(), req)
if err != nil {
return err
}
}

log.Info(fmt.Sprintf("Waiting for change request %s", *resp.ChangeInfo.Id))
wreq := &route53.GetChangeInput{Id: resp.ChangeInfo.Id}
if err := c.api.WaitUntilResourceRecordSetsChanged(wreq); err != nil {
return err
// wait for all change batches to propagate
for _, change := range changesToCheck {
log.Info(fmt.Sprintf("Waiting for change request %s", *change.ChangeInfo.Id))
wreq := &route53.GetChangeInput{Id: change.ChangeInfo.Id}
var count int
for {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waiters have been refactored in v2. They are no longer local extensions of the request.Waiter type. V2 does have support for waiters but limited to certain packages and functions.

Please see how the previous waiter was implemented
Like my implementation here, you can see that the waiter checks the status 60 times in 30 second increments

v2 waiter implementations are handled in a similar fashion

wresp, err := c.api.GetChange(context.TODO(), wreq)
if err != nil {
return err
}

count++

if wresp.ChangeInfo.Status == types.ChangeStatusInsync || count >= maxRetryLimit {
break
}

time.Sleep(30 * time.Second)
}
}

return nil
}

Expand All @@ -140,7 +162,7 @@ func (c *route53Client) findZoneID(name string) (string, error) {
log.Info(fmt.Sprintf("Finding Route53 Zone ID for %s", name))
var req route53.ListHostedZonesByNameInput
for {
resp, err := c.api.ListHostedZonesByName(&req)
resp, err := c.api.ListHostedZonesByName(context.TODO(), &req)
if err != nil {
return "", err
}
Expand All @@ -149,7 +171,7 @@ func (c *route53Client) findZoneID(name string) (string, error) {
return *zone.Id, nil
}
}
if !*resp.IsTruncated {
if !resp.IsTruncated {
break
}
req.DNSName = resp.NextDNSName
Expand All @@ -159,15 +181,15 @@ func (c *route53Client) findZoneID(name string) (string, error) {
}

// computeChanges creates DNS changes for the given record.
func (c *route53Client) computeChanges(name string, records map[string]string, existing map[string]recordSet) []*route53.Change {
func (c *route53Client) computeChanges(name string, records map[string]string, existing map[string]recordSet) []types.Change {
// Convert all names to lowercase.
lrecords := make(map[string]string, len(records))
for name, r := range records {
lrecords[strings.ToLower(name)] = r
}
records = lrecords

var changes []*route53.Change
var changes []types.Change
for path, val := range records {
ttl := int64(rootTTL)
if path != name {
Expand Down Expand Up @@ -204,21 +226,21 @@ func (c *route53Client) computeChanges(name string, records map[string]string, e
}

// sortChanges ensures DNS changes are in leaf-added -> root-changed -> leaf-deleted order.
func sortChanges(changes []*route53.Change) {
func sortChanges(changes []types.Change) {
score := map[string]int{"CREATE": 1, "UPSERT": 2, "DELETE": 3}
sort.Slice(changes, func(i, j int) bool {
if *changes[i].Action == *changes[j].Action {
if changes[i].Action == changes[j].Action {
return *changes[i].ResourceRecordSet.Name < *changes[j].ResourceRecordSet.Name
}
return score[*changes[i].Action] < score[*changes[j].Action]
return score[string(changes[i].Action)] < score[string(changes[j].Action)]
})
}

// splitChanges splits up DNS changes such that each change batch
// is smaller than the given RDATA limit.
func splitChanges(changes []*route53.Change, sizeLimit, countLimit int) [][]*route53.Change {
func splitChanges(changes []types.Change, sizeLimit, countLimit int) [][]types.Change {
var (
batches [][]*route53.Change
batches [][]types.Change
batchSize int
batchCount int
)
Expand All @@ -241,7 +263,7 @@ func splitChanges(changes []*route53.Change, sizeLimit, countLimit int) [][]*rou
}

// changeSize returns the RDATA size of a DNS change.
func changeSize(ch *route53.Change) int {
func changeSize(ch types.Change) int {
size := 0
for _, rr := range ch.ResourceRecordSet.ResourceRecords {
if rr.Value != nil {
Expand All @@ -251,8 +273,8 @@ func changeSize(ch *route53.Change) int {
return size
}

func changeCount(ch *route53.Change) int {
if *ch.Action == "UPSERT" {
func changeCount(ch types.Change) int {
if ch.Action == types.ChangeActionUpsert {
return 2
}
return 1
Expand All @@ -262,42 +284,58 @@ func changeCount(ch *route53.Change) int {
func (c *route53Client) collectRecords(name string) (map[string]recordSet, error) {
log.Info(fmt.Sprintf("Retrieving existing TXT records on %s (%s)", name, c.zoneID))
var req route53.ListResourceRecordSetsInput
req.SetHostedZoneId(c.zoneID)
req.HostedZoneId = &c.zoneID
existing := make(map[string]recordSet)
err := c.api.ListResourceRecordSetsPages(&req, func(resp *route53.ListResourceRecordSetsOutput, last bool) bool {
for {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

callback paginators are no longer implemented in v2, utilizing cursors instead. This refactor actually follows the same logical flow as an already existing pagination implementation

https://github.com/ethereum/go-ethereum/pull/22360/files#diff-2ca52123d993656e6b05df9dd124a36b2be796acfa612f371c3c90afc68b24b2L142

resp, err := c.api.ListResourceRecordSets(context.TODO(), &req)
if err != nil {
return existing, err
}

for _, set := range resp.ResourceRecordSets {
if !isSubdomain(*set.Name, name) || *set.Type != "TXT" {
if !isSubdomain(*set.Name, name) || set.Type != types.RRTypeTxt {
continue
}

s := recordSet{ttl: *set.TTL}
for _, rec := range set.ResourceRecords {
s.values = append(s.values, *rec.Value)
}
name := strings.TrimSuffix(*set.Name, ".")
existing[name] = s
}
return true
})
return existing, err

if !resp.IsTruncated {
break
}

// sets the cursor to the next batch
req.StartRecordIdentifier = resp.NextRecordIdentifier
}

return existing, nil
}

// newTXTChange creates a change to a TXT record.
func newTXTChange(action, name string, ttl int64, values ...string) *route53.Change {
var c route53.Change
var r route53.ResourceRecordSet
var rrs []*route53.ResourceRecord
func newTXTChange(action, name string, ttl int64, values ...string) types.Change {
r := types.ResourceRecordSet{
Type: types.RRTypeTxt,
Name: &name,
TTL: &ttl,
}
var rrs []types.ResourceRecord
for _, val := range values {
rr := new(route53.ResourceRecord)
rr.SetValue(val)
var rr types.ResourceRecord
rr.Value = aws.String(val)
rrs = append(rrs, rr)
}
r.SetType("TXT")
r.SetName(name)
r.SetTTL(ttl)
r.SetResourceRecords(rrs)
c.SetAction(action)
c.SetResourceRecordSet(&r)
return &c

r.ResourceRecords = rrs

return types.Change{
Action: types.ChangeAction(action),
ResourceRecordSet: &r,
}
}

// isSubdomain returns true if name is a subdomain of domain.
Expand Down
Loading