-
Notifications
You must be signed in to change notification settings - Fork 20.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cmd/devp2p: use AWS-SDK v2 #22360
cmd/devp2p: use AWS-SDK v2 #22360
Changes from all commits
fa65908
eddd9f3
4690361
e03f7b6
d3b62d0
ccf0a43
eace772
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,16 +17,19 @@ | |
package main | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"sort" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/aws/credentials" | ||
"github.com/aws/aws-sdk-go/aws/session" | ||
"github.com/aws/aws-sdk-go/service/route53" | ||
"github.com/aws/aws-sdk-go-v2/aws" | ||
"github.com/aws/aws-sdk-go-v2/config" | ||
"github.com/aws/aws-sdk-go-v2/credentials" | ||
"github.com/aws/aws-sdk-go-v2/service/route53" | ||
"github.com/aws/aws-sdk-go-v2/service/route53/types" | ||
"github.com/ethereum/go-ethereum/log" | ||
"github.com/ethereum/go-ethereum/p2p/dnsdisc" | ||
"gopkg.in/urfave/cli.v1" | ||
|
@@ -38,6 +41,7 @@ const ( | |
// https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/DNSLimitations.html#limits-api-requests-changeresourcerecordsets | ||
route53ChangeSizeLimit = 32000 | ||
route53ChangeCountLimit = 1000 | ||
maxRetryLimit = 60 | ||
) | ||
|
||
var ( | ||
|
@@ -58,7 +62,7 @@ var ( | |
) | ||
|
||
type route53Client struct { | ||
api *route53.Route53 | ||
api *route53.Client | ||
zoneID string | ||
} | ||
|
||
|
@@ -74,13 +78,13 @@ func newRoute53Client(ctx *cli.Context) *route53Client { | |
if akey == "" || asec == "" { | ||
exit(fmt.Errorf("need Route53 Access Key ID and secret proceed")) | ||
} | ||
config := &aws.Config{Credentials: credentials.NewStaticCredentials(akey, asec, "")} | ||
session, err := session.NewSession(config) | ||
creds := aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(akey, asec, "")) | ||
cfg, err := config.LoadDefaultConfig(context.Background(), config.WithCredentialsProvider(creds)) | ||
if err != nil { | ||
exit(fmt.Errorf("can't create AWS session: %v", err)) | ||
exit(fmt.Errorf("can't initialize AWS configuration: %v", err)) | ||
} | ||
return &route53Client{ | ||
api: route53.New(session), | ||
api: route53.NewFromConfig(cfg), | ||
zoneID: ctx.String(route53ZoneIDFlag.Name), | ||
} | ||
} | ||
|
@@ -105,25 +109,43 @@ func (c *route53Client) deploy(name string, t *dnsdisc.Tree) error { | |
return nil | ||
} | ||
|
||
// Submit change batches. | ||
// Submit all change batches. | ||
batches := splitChanges(changes, route53ChangeSizeLimit, route53ChangeCountLimit) | ||
changesToCheck := make([]*route53.ChangeResourceRecordSetsOutput, len(batches)) | ||
for i, changes := range batches { | ||
log.Info(fmt.Sprintf("Submitting %d changes to Route53", len(changes))) | ||
batch := new(route53.ChangeBatch) | ||
batch.SetChanges(changes) | ||
batch.SetComment(fmt.Sprintf("enrtree update %d/%d of %s at seq %d", i+1, len(batches), name, t.Seq())) | ||
batch := &types.ChangeBatch{ | ||
Changes: changes, | ||
Comment: aws.String(fmt.Sprintf("enrtree update %d/%d of %s at seq %d", i+1, len(batches), name, t.Seq())), | ||
} | ||
req := &route53.ChangeResourceRecordSetsInput{HostedZoneId: &c.zoneID, ChangeBatch: batch} | ||
resp, err := c.api.ChangeResourceRecordSets(req) | ||
changesToCheck[i], err = c.api.ChangeResourceRecordSets(context.TODO(), req) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
log.Info(fmt.Sprintf("Waiting for change request %s", *resp.ChangeInfo.Id)) | ||
wreq := &route53.GetChangeInput{Id: resp.ChangeInfo.Id} | ||
if err := c.api.WaitUntilResourceRecordSetsChanged(wreq); err != nil { | ||
return err | ||
// wait for all change batches to propagate | ||
for _, change := range changesToCheck { | ||
log.Info(fmt.Sprintf("Waiting for change request %s", *change.ChangeInfo.Id)) | ||
wreq := &route53.GetChangeInput{Id: change.ChangeInfo.Id} | ||
var count int | ||
for { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. waiters have been refactored in v2. They are no longer local extensions of the request.Waiter type. V2 does have support for waiters but limited to certain packages and functions. Please see how the previous waiter was implemented v2 waiter implementations are handled in a similar fashion |
||
wresp, err := c.api.GetChange(context.TODO(), wreq) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
count++ | ||
|
||
if wresp.ChangeInfo.Status == types.ChangeStatusInsync || count >= maxRetryLimit { | ||
break | ||
} | ||
|
||
time.Sleep(30 * time.Second) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
|
@@ -140,7 +162,7 @@ func (c *route53Client) findZoneID(name string) (string, error) { | |
log.Info(fmt.Sprintf("Finding Route53 Zone ID for %s", name)) | ||
var req route53.ListHostedZonesByNameInput | ||
for { | ||
resp, err := c.api.ListHostedZonesByName(&req) | ||
resp, err := c.api.ListHostedZonesByName(context.TODO(), &req) | ||
if err != nil { | ||
return "", err | ||
} | ||
|
@@ -149,7 +171,7 @@ func (c *route53Client) findZoneID(name string) (string, error) { | |
return *zone.Id, nil | ||
} | ||
} | ||
if !*resp.IsTruncated { | ||
if !resp.IsTruncated { | ||
break | ||
} | ||
req.DNSName = resp.NextDNSName | ||
|
@@ -159,15 +181,15 @@ func (c *route53Client) findZoneID(name string) (string, error) { | |
} | ||
|
||
// computeChanges creates DNS changes for the given record. | ||
func (c *route53Client) computeChanges(name string, records map[string]string, existing map[string]recordSet) []*route53.Change { | ||
func (c *route53Client) computeChanges(name string, records map[string]string, existing map[string]recordSet) []types.Change { | ||
// Convert all names to lowercase. | ||
lrecords := make(map[string]string, len(records)) | ||
for name, r := range records { | ||
lrecords[strings.ToLower(name)] = r | ||
} | ||
records = lrecords | ||
|
||
var changes []*route53.Change | ||
var changes []types.Change | ||
for path, val := range records { | ||
ttl := int64(rootTTL) | ||
if path != name { | ||
|
@@ -204,21 +226,21 @@ func (c *route53Client) computeChanges(name string, records map[string]string, e | |
} | ||
|
||
// sortChanges ensures DNS changes are in leaf-added -> root-changed -> leaf-deleted order. | ||
func sortChanges(changes []*route53.Change) { | ||
func sortChanges(changes []types.Change) { | ||
score := map[string]int{"CREATE": 1, "UPSERT": 2, "DELETE": 3} | ||
sort.Slice(changes, func(i, j int) bool { | ||
if *changes[i].Action == *changes[j].Action { | ||
if changes[i].Action == changes[j].Action { | ||
return *changes[i].ResourceRecordSet.Name < *changes[j].ResourceRecordSet.Name | ||
} | ||
return score[*changes[i].Action] < score[*changes[j].Action] | ||
return score[string(changes[i].Action)] < score[string(changes[j].Action)] | ||
}) | ||
} | ||
|
||
// splitChanges splits up DNS changes such that each change batch | ||
// is smaller than the given RDATA limit. | ||
func splitChanges(changes []*route53.Change, sizeLimit, countLimit int) [][]*route53.Change { | ||
func splitChanges(changes []types.Change, sizeLimit, countLimit int) [][]types.Change { | ||
var ( | ||
batches [][]*route53.Change | ||
batches [][]types.Change | ||
batchSize int | ||
batchCount int | ||
) | ||
|
@@ -241,7 +263,7 @@ func splitChanges(changes []*route53.Change, sizeLimit, countLimit int) [][]*rou | |
} | ||
|
||
// changeSize returns the RDATA size of a DNS change. | ||
func changeSize(ch *route53.Change) int { | ||
func changeSize(ch types.Change) int { | ||
size := 0 | ||
for _, rr := range ch.ResourceRecordSet.ResourceRecords { | ||
if rr.Value != nil { | ||
|
@@ -251,8 +273,8 @@ func changeSize(ch *route53.Change) int { | |
return size | ||
} | ||
|
||
func changeCount(ch *route53.Change) int { | ||
if *ch.Action == "UPSERT" { | ||
func changeCount(ch types.Change) int { | ||
if ch.Action == types.ChangeActionUpsert { | ||
return 2 | ||
} | ||
return 1 | ||
|
@@ -262,42 +284,58 @@ func changeCount(ch *route53.Change) int { | |
func (c *route53Client) collectRecords(name string) (map[string]recordSet, error) { | ||
log.Info(fmt.Sprintf("Retrieving existing TXT records on %s (%s)", name, c.zoneID)) | ||
var req route53.ListResourceRecordSetsInput | ||
req.SetHostedZoneId(c.zoneID) | ||
req.HostedZoneId = &c.zoneID | ||
existing := make(map[string]recordSet) | ||
err := c.api.ListResourceRecordSetsPages(&req, func(resp *route53.ListResourceRecordSetsOutput, last bool) bool { | ||
for { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. callback paginators are no longer implemented in v2, utilizing cursors instead. This refactor actually follows the same logical flow as an already existing pagination implementation |
||
resp, err := c.api.ListResourceRecordSets(context.TODO(), &req) | ||
if err != nil { | ||
return existing, err | ||
} | ||
|
||
for _, set := range resp.ResourceRecordSets { | ||
if !isSubdomain(*set.Name, name) || *set.Type != "TXT" { | ||
if !isSubdomain(*set.Name, name) || set.Type != types.RRTypeTxt { | ||
continue | ||
} | ||
|
||
s := recordSet{ttl: *set.TTL} | ||
for _, rec := range set.ResourceRecords { | ||
s.values = append(s.values, *rec.Value) | ||
} | ||
name := strings.TrimSuffix(*set.Name, ".") | ||
existing[name] = s | ||
} | ||
return true | ||
}) | ||
return existing, err | ||
|
||
if !resp.IsTruncated { | ||
break | ||
} | ||
|
||
// sets the cursor to the next batch | ||
req.StartRecordIdentifier = resp.NextRecordIdentifier | ||
} | ||
|
||
return existing, nil | ||
} | ||
|
||
// newTXTChange creates a change to a TXT record. | ||
func newTXTChange(action, name string, ttl int64, values ...string) *route53.Change { | ||
var c route53.Change | ||
var r route53.ResourceRecordSet | ||
var rrs []*route53.ResourceRecord | ||
func newTXTChange(action, name string, ttl int64, values ...string) types.Change { | ||
r := types.ResourceRecordSet{ | ||
Type: types.RRTypeTxt, | ||
Name: &name, | ||
TTL: &ttl, | ||
} | ||
var rrs []types.ResourceRecord | ||
for _, val := range values { | ||
rr := new(route53.ResourceRecord) | ||
rr.SetValue(val) | ||
var rr types.ResourceRecord | ||
rr.Value = aws.String(val) | ||
rrs = append(rrs, rr) | ||
} | ||
r.SetType("TXT") | ||
r.SetName(name) | ||
r.SetTTL(ttl) | ||
r.SetResourceRecords(rrs) | ||
c.SetAction(action) | ||
c.SetResourceRecordSet(&r) | ||
return &c | ||
|
||
r.ResourceRecords = rrs | ||
|
||
return types.Change{ | ||
Action: types.ChangeAction(action), | ||
ResourceRecordSet: &r, | ||
} | ||
} | ||
|
||
// isSubdomain returns true if name is a subdomain of domain. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this loop can be optimized, mostly because the waiter is blocking (this is the case in the current version as well as in this PR).
Currently for every batch, it makes the change and then waits up to 30 minutes for the change to propagate before making the next change for the next batch. In a worse case scenario, this loop could block a command for hours.
My proposal is that either
we loop through the batch and make all of the changes, then block the thread until all changes are made. This way the bottleneck only exists on the AWS side of how fast changes are propagated
create a go-routine for each batch. This way each routine is responsible for making the change and then waiting for the propagation. The thread would lock until all of the routines have broken out of the wait loop. Like option 1, the bottleneck would only exist on the AWS side.
option 2 would be more elegant, both options would be a dramatic optimization. However I am not clear if this is necessary or desired. If it is I can include those changes in this PR or a different one (I can also create the optimization with the v1 sdk)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to make this change, I prefer option 1. There can be a lot of changes, and starting a goroutine for each has its own problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fjl I have implemented option 1. Propagation takes an average of 60 seconds, according to AWS, so now instead of waiting ~ 60-90 seconds per change, ideally it would take 60-90 seconds for the first change and then the rest of the waiters should be successful on the first request