Skip to content

Commit

Permalink
[#570]: chore: clean up authorization/environment logic
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Oct 20, 2024
2 parents e32b815 + ba071d2 commit 01a82a8
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 221 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ toolchain go1.23.2

require (
github.com/aws/aws-sdk-go-v2 v1.32.2
github.com/aws/aws-sdk-go-v2/config v1.27.43
github.com/aws/aws-sdk-go-v2/config v1.28.0
github.com/aws/aws-sdk-go-v2/credentials v1.17.41
github.com/aws/aws-sdk-go-v2/service/sqs v1.36.2
github.com/aws/smithy-go v1.22.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/aws/aws-sdk-go-v2 v1.32.2 h1:AkNLZEyYMLnx/Q/mSKkcMqwNFXMAvFto9bNsHqcTduI=
github.com/aws/aws-sdk-go-v2 v1.32.2/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo=
github.com/aws/aws-sdk-go-v2/config v1.27.43 h1:p33fDDihFC390dhhuv8nOmX419wjOSDQRb+USt20RrU=
github.com/aws/aws-sdk-go-v2/config v1.27.43/go.mod h1:pYhbtvg1siOOg8h5an77rXle9tVG8T+BWLWAo7cOukc=
github.com/aws/aws-sdk-go-v2/config v1.28.0 h1:FosVYWcqEtWNxHn8gB/Vs6jOlNwSoyOCA/g/sxyySOQ=
github.com/aws/aws-sdk-go-v2/config v1.28.0/go.mod h1:pYhbtvg1siOOg8h5an77rXle9tVG8T+BWLWAo7cOukc=
github.com/aws/aws-sdk-go-v2/credentials v1.17.41 h1:7gXo+Axmp+R4Z+AK8YFQO0ZV3L0gizGINCOWxSLY9W8=
github.com/aws/aws-sdk-go-v2/credentials v1.17.41/go.mod h1:u4Eb8d3394YLubphT4jLEwN1rLNq2wFOlT6OuxFwPzU=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17 h1:TMH3f/SCAWdNtXXVPPu5D6wrr4G5hI1rAxbcocKfC7Q=
Expand Down
4 changes: 0 additions & 4 deletions sqsjobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,6 @@ type Config struct {
}

func (c *Config) InitDefault() {
if c.Endpoint == "" {
c.Endpoint = "http://127.0.0.1:9324"
}

if c.Queue == nil {
c.Queue = aws.String("default")
}
Expand Down
165 changes: 40 additions & 125 deletions sqsjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sqsjobs
import (
"context"
stderr "errors"
"net/http"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -27,11 +26,9 @@ import (
)

const (
pluginName string = "sqs"
tracerName string = "jobs"
awsMetaDataURL string = "http://169.254.169.254/latest/dynamic/instance-identity/"
awsMetaDataIMDSv2URL string = "http://169.254.169.254/latest/api/token"
awsTokenHeader string = "X-aws-ec2-metadata-token-ttl-seconds" //nolint:gosec
pluginName string = "sqs"
tracerName string = "jobs"
assumeAWSEnv string = "No sqs plugin configuration section was found; assuming we're in AWS environment and want to use default values."
)

var _ jobs.Driver = (*Driver)(nil)
Expand Down Expand Up @@ -82,26 +79,12 @@ type Driver struct {

func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("new_sqs_consumer")
/*
we need to determine in what environment we are running
1. Non-AWS - global sqs config should be set
2. AWS - configuration should be obtained from the env, but with the ability to override them with the global config
*/
var insideAWS bool
if isInAWS() || isinAWSIMDSv2() {
insideAWS = true
}

// if no such key - error
if !cfg.Has(configKey) {
return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
}

// if no global section - try to fetch IAM creds
if !cfg.Has(pluginName) && !insideAWS {
return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section"))
}

if tracer == nil {
tracer = sdktrace.NewTracerProvider()
}
Expand All @@ -122,6 +105,8 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip
if err != nil {
return nil, errors.E(op, err)
}
} else {
log.Info(assumeAWSEnv)
}

conf.InitDefault()
Expand All @@ -147,7 +132,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip
}

// PARSE CONFIGURATION -------
jb.client, err = checkEnv(insideAWS, conf.Key, conf.Secret, conf.SessionToken, conf.Endpoint, conf.Region)
jb.client, err = checkEnv(conf.Key, conf.Secret, conf.SessionToken, conf.Endpoint, conf.Region)
if err != nil {
return nil, errors.E(op, err)
}
Expand All @@ -174,21 +159,6 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip
func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("new_sqs_consumer")

/*
we need to determine in what environment we are running
1. Non-AWS - global sqs config should be set
2. AWS - configuration should be obtained from the env
*/
var insideAWS bool
if isInAWS() || isinAWSIMDSv2() {
insideAWS = true
}

// if no global section
if !cfg.Has(pluginName) && !insideAWS {
return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section"))
}

if tracer == nil {
tracer = sdktrace.NewTracerProvider()
}
Expand All @@ -205,6 +175,8 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.
if err != nil {
return nil, errors.E(op, err)
}
} else {
log.Info(assumeAWSEnv)
}

conf.InitDefault()
Expand Down Expand Up @@ -243,7 +215,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.

// PARSE CONFIGURATION -------

jb.client, err = checkEnv(insideAWS, conf.Key, conf.Secret, conf.SessionToken, conf.Endpoint, conf.Region)
jb.client, err = checkEnv(conf.Key, conf.Secret, conf.SessionToken, conf.Endpoint, conf.Region)
if err != nil {
return nil, errors.E(op, err)
}
Expand Down Expand Up @@ -473,53 +445,45 @@ func (c *Driver) handleItem(ctx context.Context, msg *Item) error {
return nil
}

func checkEnv(insideAWS bool, key, secret, sessionToken, endpoint, region string) (*sqs.Client, error) {
func checkEnv(key, secret, sessionToken, endpoint, region string) (*sqs.Client, error) {
const op = errors.Op("check_env")
var client *sqs.Client
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

switch insideAWS {
case true:
// respect user provided values for the sqs
opts := make([]func(*config.LoadOptions) error, 0, 1)
if region != "" {
opts = append(opts, config.WithRegion(region))
}
if secret != "" && key != "" && sessionToken != "" {
opts = append(opts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(key, secret, sessionToken)))
}
opts := make([]func(*config.LoadOptions) error, 0, 1)

awsConf, err := config.LoadDefaultConfig(ctx, opts...)
if err != nil {
return nil, errors.E(op, err)
}
if region != "" {
opts = append(opts, config.WithRegion(region))
}

// config with retries
client = sqs.NewFromConfig(awsConf, func(o *sqs.Options) {
o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) {
opts.MaxAttempts = 60
opts.MaxBackoff = time.Second * 2
})
})
case false:
awsConf, err := config.LoadDefaultConfig(ctx,
config.WithRegion(region),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(key, secret, sessionToken)))
if err != nil {
return nil, errors.E(op, err)
}
// session_token is optional; if no credentials are provided, we assume user wants to default to AWS IAM.
if secret != "" && key != "" {
opts = append(opts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(key, secret, sessionToken)))
}

// config with retries
client = sqs.NewFromConfig(awsConf, func(o *sqs.Options) {
o.BaseEndpoint = &endpoint
o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) {
opts.MaxAttempts = 60
opts.MaxBackoff = time.Second * 2
})
})
if endpoint != "" {
// Setting the endpoint is only necessary when self-hosting the queue, as region (either from AWS env or
// set in rr config) will tell us which AWS SQS endpoint to use.
opts = append(opts, config.WithBaseEndpoint(endpoint))
}

// Load default credentials; if in AWS context, this will auth as the associated IAM role.
// We override this config with user-provided values, if any.
awsConf, err := config.LoadDefaultConfig(ctx, opts...)

if err != nil {
return nil, errors.E(op, err)
}

// config with retries
client = sqs.NewFromConfig(awsConf, func(o *sqs.Options) {
o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) {
opts.MaxAttempts = 60
opts.MaxBackoff = time.Second * 2
})
})

return client, nil
}

Expand All @@ -541,64 +505,15 @@ func manageQueue(jb *Driver) error {
return nil
}

// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/identify_ec2_instances.html
func isInAWS() bool {
client := &http.Client{
Timeout: time.Second * 2,
}
resp, err := client.Get(awsMetaDataURL) //nolint:noctx
if err != nil {
return false
}

_ = resp.Body.Close()

return resp.StatusCode == http.StatusOK
}

// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
func isinAWSIMDSv2() bool {
client := &http.Client{
Timeout: time.Second * 2,
}

// probably we're in the IMDSv2, let's try different endpoint
req, err := http.NewRequestWithContext(context.Background(), http.MethodPut, awsMetaDataIMDSv2URL, nil)
if err != nil {
return false
}

// 10 seconds should be fine to just check
req.Header.Set(awsTokenHeader, "10")

resp, err := client.Do(req)
if err != nil {
return false
}

_ = resp.Body.Close()

return resp.StatusCode == http.StatusOK
}

func createQueue(client *sqs.Client, queueName *string, attributes map[string]string, tags map[string]string) (*string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
out, err := client.CreateQueue(ctx, &sqs.CreateQueueInput{QueueName: queueName, Attributes: attributes, Tags: tags})
if err != nil {
var qErr *types.QueueNameExists
if stderr.As(err, &qErr) {
ctxGet, cancelGet := context.WithTimeout(context.Background(), time.Second*30)
defer cancelGet()
res, errQ := client.GetQueueUrl(ctxGet, &sqs.GetQueueUrlInput{
QueueName: queueName,
}, func(_ *sqs.Options) {})
if errQ != nil {
return nil, errQ
}

return res.QueueUrl, nil
// Queue already exists; return existing URL instead.
return getQueueURL(client, queueName)
}

return nil, err
Expand Down
42 changes: 0 additions & 42 deletions tests/configs/.rr-no-global.yaml

This file was deleted.

12 changes: 6 additions & 6 deletions tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ go 1.23
toolchain go1.23.2

require (
github.com/Shopify/toxiproxy/v2 v2.10.0
github.com/Shopify/toxiproxy/v2 v2.11.0
github.com/aws/aws-sdk-go-v2 v1.32.2
github.com/aws/aws-sdk-go-v2/config v1.27.43
github.com/aws/aws-sdk-go-v2/config v1.28.0
github.com/aws/aws-sdk-go-v2/credentials v1.17.41
github.com/aws/aws-sdk-go-v2/service/sqs v1.36.2
github.com/goccy/go-json v0.10.3
Expand All @@ -23,7 +23,7 @@ require (
github.com/roadrunner-server/resetter/v5 v5.0.5
github.com/roadrunner-server/rpc/v5 v5.0.4
github.com/roadrunner-server/server/v5 v5.1.2
github.com/roadrunner-server/sqs/v5 v5.0.0
github.com/roadrunner-server/sqs/v5 v5.0.4
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.27.0
)
Expand Down Expand Up @@ -68,7 +68,7 @@ require (
github.com/pborman/uuid v1.2.1 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.20.4 // indirect
github.com/prometheus/client_golang v1.20.5 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.60.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
Expand Down Expand Up @@ -116,8 +116,8 @@ require (
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/time v0.7.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect
google.golang.org/grpc v1.67.1 // indirect
google.golang.org/protobuf v1.35.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
Loading

0 comments on commit 01a82a8

Please sign in to comment.