Skip to content

Commit

Permalink
Remove EC2 detection; rely on LoadDefaultConfig
Browse files Browse the repository at this point in the history
Deduplicate checkEnv code
Deduplicate GetQueueUrl code
Default to empty endpoint
Update dependencies, use WithBaseEndpoint
Remove "no global section" test
Add log statement for AWS IAM config assumption
  • Loading branch information
nickdnk committed Oct 20, 2024
1 parent e32b815 commit ba071d2
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 221 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ toolchain go1.23.2

require (
github.com/aws/aws-sdk-go-v2 v1.32.2
github.com/aws/aws-sdk-go-v2/config v1.27.43
github.com/aws/aws-sdk-go-v2/config v1.28.0
github.com/aws/aws-sdk-go-v2/credentials v1.17.41
github.com/aws/aws-sdk-go-v2/service/sqs v1.36.2
github.com/aws/smithy-go v1.22.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/aws/aws-sdk-go-v2 v1.32.2 h1:AkNLZEyYMLnx/Q/mSKkcMqwNFXMAvFto9bNsHqcTduI=
github.com/aws/aws-sdk-go-v2 v1.32.2/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo=
github.com/aws/aws-sdk-go-v2/config v1.27.43 h1:p33fDDihFC390dhhuv8nOmX419wjOSDQRb+USt20RrU=
github.com/aws/aws-sdk-go-v2/config v1.27.43/go.mod h1:pYhbtvg1siOOg8h5an77rXle9tVG8T+BWLWAo7cOukc=
github.com/aws/aws-sdk-go-v2/config v1.28.0 h1:FosVYWcqEtWNxHn8gB/Vs6jOlNwSoyOCA/g/sxyySOQ=
github.com/aws/aws-sdk-go-v2/config v1.28.0/go.mod h1:pYhbtvg1siOOg8h5an77rXle9tVG8T+BWLWAo7cOukc=
github.com/aws/aws-sdk-go-v2/credentials v1.17.41 h1:7gXo+Axmp+R4Z+AK8YFQO0ZV3L0gizGINCOWxSLY9W8=
github.com/aws/aws-sdk-go-v2/credentials v1.17.41/go.mod h1:u4Eb8d3394YLubphT4jLEwN1rLNq2wFOlT6OuxFwPzU=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17 h1:TMH3f/SCAWdNtXXVPPu5D6wrr4G5hI1rAxbcocKfC7Q=
Expand Down
4 changes: 0 additions & 4 deletions sqsjobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,6 @@ type Config struct {
}

func (c *Config) InitDefault() {
if c.Endpoint == "" {
c.Endpoint = "http://127.0.0.1:9324"
}

if c.Queue == nil {
c.Queue = aws.String("default")
}
Expand Down
165 changes: 40 additions & 125 deletions sqsjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sqsjobs
import (
"context"
stderr "errors"
"net/http"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -27,11 +26,9 @@ import (
)

const (
pluginName string = "sqs"
tracerName string = "jobs"
awsMetaDataURL string = "http://169.254.169.254/latest/dynamic/instance-identity/"
awsMetaDataIMDSv2URL string = "http://169.254.169.254/latest/api/token"
awsTokenHeader string = "X-aws-ec2-metadata-token-ttl-seconds" //nolint:gosec
pluginName string = "sqs"
tracerName string = "jobs"
assumeAWSEnv string = "No sqs plugin configuration section was found; assuming we're in AWS environment and want to use default values."
)

var _ jobs.Driver = (*Driver)(nil)
Expand Down Expand Up @@ -82,26 +79,12 @@ type Driver struct {

func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("new_sqs_consumer")
/*
we need to determine in what environment we are running
1. Non-AWS - global sqs config should be set
2. AWS - configuration should be obtained from the env, but with the ability to override them with the global config
*/
var insideAWS bool
if isInAWS() || isinAWSIMDSv2() {
insideAWS = true
}

// if no such key - error
if !cfg.Has(configKey) {
return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
}

// if no global section - try to fetch IAM creds
if !cfg.Has(pluginName) && !insideAWS {
return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section"))
}

if tracer == nil {
tracer = sdktrace.NewTracerProvider()
}
Expand All @@ -122,6 +105,8 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip
if err != nil {
return nil, errors.E(op, err)
}
} else {
log.Info(assumeAWSEnv)
}

conf.InitDefault()
Expand All @@ -147,7 +132,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip
}

// PARSE CONFIGURATION -------
jb.client, err = checkEnv(insideAWS, conf.Key, conf.Secret, conf.SessionToken, conf.Endpoint, conf.Region)
jb.client, err = checkEnv(conf.Key, conf.Secret, conf.SessionToken, conf.Endpoint, conf.Region)
if err != nil {
return nil, errors.E(op, err)
}
Expand All @@ -174,21 +159,6 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip
func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("new_sqs_consumer")

/*
we need to determine in what environment we are running
1. Non-AWS - global sqs config should be set
2. AWS - configuration should be obtained from the env
*/
var insideAWS bool
if isInAWS() || isinAWSIMDSv2() {
insideAWS = true
}

// if no global section
if !cfg.Has(pluginName) && !insideAWS {
return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section"))
}

if tracer == nil {
tracer = sdktrace.NewTracerProvider()
}
Expand All @@ -205,6 +175,8 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.
if err != nil {
return nil, errors.E(op, err)
}
} else {
log.Info(assumeAWSEnv)
}

conf.InitDefault()
Expand Down Expand Up @@ -243,7 +215,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.

// PARSE CONFIGURATION -------

jb.client, err = checkEnv(insideAWS, conf.Key, conf.Secret, conf.SessionToken, conf.Endpoint, conf.Region)
jb.client, err = checkEnv(conf.Key, conf.Secret, conf.SessionToken, conf.Endpoint, conf.Region)
if err != nil {
return nil, errors.E(op, err)
}
Expand Down Expand Up @@ -473,53 +445,45 @@ func (c *Driver) handleItem(ctx context.Context, msg *Item) error {
return nil
}

func checkEnv(insideAWS bool, key, secret, sessionToken, endpoint, region string) (*sqs.Client, error) {
func checkEnv(key, secret, sessionToken, endpoint, region string) (*sqs.Client, error) {
const op = errors.Op("check_env")
var client *sqs.Client
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

switch insideAWS {
case true:
// respect user provided values for the sqs
opts := make([]func(*config.LoadOptions) error, 0, 1)
if region != "" {
opts = append(opts, config.WithRegion(region))
}
if secret != "" && key != "" && sessionToken != "" {
opts = append(opts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(key, secret, sessionToken)))
}
opts := make([]func(*config.LoadOptions) error, 0, 1)

awsConf, err := config.LoadDefaultConfig(ctx, opts...)
if err != nil {
return nil, errors.E(op, err)
}
if region != "" {
opts = append(opts, config.WithRegion(region))
}

// config with retries
client = sqs.NewFromConfig(awsConf, func(o *sqs.Options) {
o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) {
opts.MaxAttempts = 60
opts.MaxBackoff = time.Second * 2
})
})
case false:
awsConf, err := config.LoadDefaultConfig(ctx,
config.WithRegion(region),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(key, secret, sessionToken)))
if err != nil {
return nil, errors.E(op, err)
}
// session_token is optional; if no credentials are provided, we assume user wants to default to AWS IAM.
if secret != "" && key != "" {
opts = append(opts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(key, secret, sessionToken)))
}

// config with retries
client = sqs.NewFromConfig(awsConf, func(o *sqs.Options) {
o.BaseEndpoint = &endpoint
o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) {
opts.MaxAttempts = 60
opts.MaxBackoff = time.Second * 2
})
})
if endpoint != "" {
// Setting the endpoint is only necessary when self-hosting the queue, as region (either from AWS env or
// set in rr config) will tell us which AWS SQS endpoint to use.
opts = append(opts, config.WithBaseEndpoint(endpoint))
}

// Load default credentials; if in AWS context, this will auth as the associated IAM role.
// We override this config with user-provided values, if any.
awsConf, err := config.LoadDefaultConfig(ctx, opts...)

if err != nil {
return nil, errors.E(op, err)
}

// config with retries
client = sqs.NewFromConfig(awsConf, func(o *sqs.Options) {
o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) {
opts.MaxAttempts = 60
opts.MaxBackoff = time.Second * 2
})
})

return client, nil
}

Expand All @@ -541,64 +505,15 @@ func manageQueue(jb *Driver) error {
return nil
}

// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/identify_ec2_instances.html
func isInAWS() bool {
client := &http.Client{
Timeout: time.Second * 2,
}
resp, err := client.Get(awsMetaDataURL) //nolint:noctx
if err != nil {
return false
}

_ = resp.Body.Close()

return resp.StatusCode == http.StatusOK
}

// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
func isinAWSIMDSv2() bool {
client := &http.Client{
Timeout: time.Second * 2,
}

// probably we're in the IMDSv2, let's try different endpoint
req, err := http.NewRequestWithContext(context.Background(), http.MethodPut, awsMetaDataIMDSv2URL, nil)
if err != nil {
return false
}

// 10 seconds should be fine to just check
req.Header.Set(awsTokenHeader, "10")

resp, err := client.Do(req)
if err != nil {
return false
}

_ = resp.Body.Close()

return resp.StatusCode == http.StatusOK
}

func createQueue(client *sqs.Client, queueName *string, attributes map[string]string, tags map[string]string) (*string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
out, err := client.CreateQueue(ctx, &sqs.CreateQueueInput{QueueName: queueName, Attributes: attributes, Tags: tags})
if err != nil {
var qErr *types.QueueNameExists
if stderr.As(err, &qErr) {
ctxGet, cancelGet := context.WithTimeout(context.Background(), time.Second*30)
defer cancelGet()
res, errQ := client.GetQueueUrl(ctxGet, &sqs.GetQueueUrlInput{
QueueName: queueName,
}, func(_ *sqs.Options) {})
if errQ != nil {
return nil, errQ
}

return res.QueueUrl, nil
// Queue already exists; return existing URL instead.
return getQueueURL(client, queueName)
}

return nil, err
Expand Down
42 changes: 0 additions & 42 deletions tests/configs/.rr-no-global.yaml

This file was deleted.

12 changes: 6 additions & 6 deletions tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ go 1.23
toolchain go1.23.2

require (
github.com/Shopify/toxiproxy/v2 v2.10.0
github.com/Shopify/toxiproxy/v2 v2.11.0
github.com/aws/aws-sdk-go-v2 v1.32.2
github.com/aws/aws-sdk-go-v2/config v1.27.43
github.com/aws/aws-sdk-go-v2/config v1.28.0
github.com/aws/aws-sdk-go-v2/credentials v1.17.41
github.com/aws/aws-sdk-go-v2/service/sqs v1.36.2
github.com/goccy/go-json v0.10.3
Expand All @@ -23,7 +23,7 @@ require (
github.com/roadrunner-server/resetter/v5 v5.0.5
github.com/roadrunner-server/rpc/v5 v5.0.4
github.com/roadrunner-server/server/v5 v5.1.2
github.com/roadrunner-server/sqs/v5 v5.0.0
github.com/roadrunner-server/sqs/v5 v5.0.4
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.27.0
)
Expand Down Expand Up @@ -68,7 +68,7 @@ require (
github.com/pborman/uuid v1.2.1 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.20.4 // indirect
github.com/prometheus/client_golang v1.20.5 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.60.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
Expand Down Expand Up @@ -116,8 +116,8 @@ require (
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/time v0.7.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect
google.golang.org/grpc v1.67.1 // indirect
google.golang.org/protobuf v1.35.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
Loading

0 comments on commit ba071d2

Please sign in to comment.