diff --git a/go.mod b/go.mod index 6a97936..32a58c5 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ toolchain go1.23.2 require ( github.com/aws/aws-sdk-go-v2 v1.32.2 - github.com/aws/aws-sdk-go-v2/config v1.27.43 + github.com/aws/aws-sdk-go-v2/config v1.28.0 github.com/aws/aws-sdk-go-v2/credentials v1.17.41 github.com/aws/aws-sdk-go-v2/service/sqs v1.36.2 github.com/aws/smithy-go v1.22.0 diff --git a/go.sum b/go.sum index 3ee1746..3dd5372 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ github.com/aws/aws-sdk-go-v2 v1.32.2 h1:AkNLZEyYMLnx/Q/mSKkcMqwNFXMAvFto9bNsHqcTduI= github.com/aws/aws-sdk-go-v2 v1.32.2/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo= -github.com/aws/aws-sdk-go-v2/config v1.27.43 h1:p33fDDihFC390dhhuv8nOmX419wjOSDQRb+USt20RrU= -github.com/aws/aws-sdk-go-v2/config v1.27.43/go.mod h1:pYhbtvg1siOOg8h5an77rXle9tVG8T+BWLWAo7cOukc= +github.com/aws/aws-sdk-go-v2/config v1.28.0 h1:FosVYWcqEtWNxHn8gB/Vs6jOlNwSoyOCA/g/sxyySOQ= +github.com/aws/aws-sdk-go-v2/config v1.28.0/go.mod h1:pYhbtvg1siOOg8h5an77rXle9tVG8T+BWLWAo7cOukc= github.com/aws/aws-sdk-go-v2/credentials v1.17.41 h1:7gXo+Axmp+R4Z+AK8YFQO0ZV3L0gizGINCOWxSLY9W8= github.com/aws/aws-sdk-go-v2/credentials v1.17.41/go.mod h1:u4Eb8d3394YLubphT4jLEwN1rLNq2wFOlT6OuxFwPzU= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17 h1:TMH3f/SCAWdNtXXVPPu5D6wrr4G5hI1rAxbcocKfC7Q= diff --git a/sqsjobs/config.go b/sqsjobs/config.go index 65dfdc6..b99fb77 100644 --- a/sqsjobs/config.go +++ b/sqsjobs/config.go @@ -109,10 +109,6 @@ type Config struct { } func (c *Config) InitDefault() { - if c.Endpoint == "" { - c.Endpoint = "http://127.0.0.1:9324" - } - if c.Queue == nil { c.Queue = aws.String("default") } diff --git a/sqsjobs/driver.go b/sqsjobs/driver.go index 16f6268..9b6058f 100644 --- a/sqsjobs/driver.go +++ b/sqsjobs/driver.go @@ -3,7 +3,6 @@ package sqsjobs import ( "context" stderr "errors" - "net/http" "strconv" "sync" "sync/atomic" @@ -27,11 +26,9 @@ import ( ) const ( - pluginName string = "sqs" - tracerName string = "jobs" - awsMetaDataURL string = "http://169.254.169.254/latest/dynamic/instance-identity/" - awsMetaDataIMDSv2URL string = "http://169.254.169.254/latest/api/token" - awsTokenHeader string = "X-aws-ec2-metadata-token-ttl-seconds" //nolint:gosec + pluginName string = "sqs" + tracerName string = "jobs" + assumeAWSEnv string = "No sqs plugin configuration section was found; assuming we're in AWS environment and want to use default values." ) var _ jobs.Driver = (*Driver)(nil) @@ -82,26 +79,12 @@ type Driver struct { func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) { const op = errors.Op("new_sqs_consumer") - /* - we need to determine in what environment we are running - 1. Non-AWS - global sqs config should be set - 2. AWS - configuration should be obtained from the env, but with the ability to override them with the global config - */ - var insideAWS bool - if isInAWS() || isinAWSIMDSv2() { - insideAWS = true - } // if no such key - error if !cfg.Has(configKey) { return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) } - // if no global section - try to fetch IAM creds - if !cfg.Has(pluginName) && !insideAWS { - return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section")) - } - if tracer == nil { tracer = sdktrace.NewTracerProvider() } @@ -122,6 +105,8 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip if err != nil { return nil, errors.E(op, err) } + } else { + log.Info(assumeAWSEnv) } conf.InitDefault() @@ -147,7 +132,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip } // PARSE CONFIGURATION ------- - jb.client, err = checkEnv(insideAWS, conf.Key, conf.Secret, conf.SessionToken, conf.Endpoint, conf.Region) + jb.client, err = checkEnv(conf.Key, conf.Secret, conf.SessionToken, conf.Endpoint, conf.Region) if err != nil { return nil, errors.E(op, err) } @@ -174,21 +159,6 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) { const op = errors.Op("new_sqs_consumer") - /* - we need to determine in what environment we are running - 1. Non-AWS - global sqs config should be set - 2. AWS - configuration should be obtained from the env - */ - var insideAWS bool - if isInAWS() || isinAWSIMDSv2() { - insideAWS = true - } - - // if no global section - if !cfg.Has(pluginName) && !insideAWS { - return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section")) - } - if tracer == nil { tracer = sdktrace.NewTracerProvider() } @@ -205,6 +175,8 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap. if err != nil { return nil, errors.E(op, err) } + } else { + log.Info(assumeAWSEnv) } conf.InitDefault() @@ -243,7 +215,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap. // PARSE CONFIGURATION ------- - jb.client, err = checkEnv(insideAWS, conf.Key, conf.Secret, conf.SessionToken, conf.Endpoint, conf.Region) + jb.client, err = checkEnv(conf.Key, conf.Secret, conf.SessionToken, conf.Endpoint, conf.Region) if err != nil { return nil, errors.E(op, err) } @@ -473,53 +445,45 @@ func (c *Driver) handleItem(ctx context.Context, msg *Item) error { return nil } -func checkEnv(insideAWS bool, key, secret, sessionToken, endpoint, region string) (*sqs.Client, error) { +func checkEnv(key, secret, sessionToken, endpoint, region string) (*sqs.Client, error) { const op = errors.Op("check_env") var client *sqs.Client ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() - switch insideAWS { - case true: - // respect user provided values for the sqs - opts := make([]func(*config.LoadOptions) error, 0, 1) - if region != "" { - opts = append(opts, config.WithRegion(region)) - } - if secret != "" && key != "" && sessionToken != "" { - opts = append(opts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(key, secret, sessionToken))) - } + opts := make([]func(*config.LoadOptions) error, 0, 1) - awsConf, err := config.LoadDefaultConfig(ctx, opts...) - if err != nil { - return nil, errors.E(op, err) - } + if region != "" { + opts = append(opts, config.WithRegion(region)) + } - // config with retries - client = sqs.NewFromConfig(awsConf, func(o *sqs.Options) { - o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) { - opts.MaxAttempts = 60 - opts.MaxBackoff = time.Second * 2 - }) - }) - case false: - awsConf, err := config.LoadDefaultConfig(ctx, - config.WithRegion(region), - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(key, secret, sessionToken))) - if err != nil { - return nil, errors.E(op, err) - } + // session_token is optional; if no credentials are provided, we assume user wants to default to AWS IAM. + if secret != "" && key != "" { + opts = append(opts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(key, secret, sessionToken))) + } - // config with retries - client = sqs.NewFromConfig(awsConf, func(o *sqs.Options) { - o.BaseEndpoint = &endpoint - o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) { - opts.MaxAttempts = 60 - opts.MaxBackoff = time.Second * 2 - }) - }) + if endpoint != "" { + // Setting the endpoint is only necessary when self-hosting the queue, as region (either from AWS env or + // set in rr config) will tell us which AWS SQS endpoint to use. + opts = append(opts, config.WithBaseEndpoint(endpoint)) + } + + // Load default credentials; if in AWS context, this will auth as the associated IAM role. + // We override this config with user-provided values, if any. + awsConf, err := config.LoadDefaultConfig(ctx, opts...) + + if err != nil { + return nil, errors.E(op, err) } + // config with retries + client = sqs.NewFromConfig(awsConf, func(o *sqs.Options) { + o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) { + opts.MaxAttempts = 60 + opts.MaxBackoff = time.Second * 2 + }) + }) + return client, nil } @@ -541,47 +505,6 @@ func manageQueue(jb *Driver) error { return nil } -// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html -// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/identify_ec2_instances.html -func isInAWS() bool { - client := &http.Client{ - Timeout: time.Second * 2, - } - resp, err := client.Get(awsMetaDataURL) //nolint:noctx - if err != nil { - return false - } - - _ = resp.Body.Close() - - return resp.StatusCode == http.StatusOK -} - -// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html -func isinAWSIMDSv2() bool { - client := &http.Client{ - Timeout: time.Second * 2, - } - - // probably we're in the IMDSv2, let's try different endpoint - req, err := http.NewRequestWithContext(context.Background(), http.MethodPut, awsMetaDataIMDSv2URL, nil) - if err != nil { - return false - } - - // 10 seconds should be fine to just check - req.Header.Set(awsTokenHeader, "10") - - resp, err := client.Do(req) - if err != nil { - return false - } - - _ = resp.Body.Close() - - return resp.StatusCode == http.StatusOK -} - func createQueue(client *sqs.Client, queueName *string, attributes map[string]string, tags map[string]string) (*string, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() @@ -589,16 +512,8 @@ func createQueue(client *sqs.Client, queueName *string, attributes map[string]st if err != nil { var qErr *types.QueueNameExists if stderr.As(err, &qErr) { - ctxGet, cancelGet := context.WithTimeout(context.Background(), time.Second*30) - defer cancelGet() - res, errQ := client.GetQueueUrl(ctxGet, &sqs.GetQueueUrlInput{ - QueueName: queueName, - }, func(_ *sqs.Options) {}) - if errQ != nil { - return nil, errQ - } - - return res.QueueUrl, nil + // Queue already exists; return existing URL instead. + return getQueueURL(client, queueName) } return nil, err diff --git a/tests/configs/.rr-no-global.yaml b/tests/configs/.rr-no-global.yaml deleted file mode 100644 index bba6d87..0000000 --- a/tests/configs/.rr-no-global.yaml +++ /dev/null @@ -1,42 +0,0 @@ -version: '3' - -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php php_test_files/jobs/jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: debug - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - pipelines: - test-1: - driver: sqs - config: - prefetch: 1000 - visibility_timeout: 0 - wait_time_seconds: 0 - queue: default - attributes: - DelaySeconds: 0 - MaximumMessageSize: 262144 - MessageRetentionPeriod: 345600 - ReceiveMessageWaitTimeSeconds: 0 - VisibilityTimeout: 30 - tags: - test: "tag" - - consume: [ "test-1" ] - diff --git a/tests/go.mod b/tests/go.mod index 4605569..75c34ea 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -5,9 +5,9 @@ go 1.23 toolchain go1.23.2 require ( - github.com/Shopify/toxiproxy/v2 v2.10.0 + github.com/Shopify/toxiproxy/v2 v2.11.0 github.com/aws/aws-sdk-go-v2 v1.32.2 - github.com/aws/aws-sdk-go-v2/config v1.27.43 + github.com/aws/aws-sdk-go-v2/config v1.28.0 github.com/aws/aws-sdk-go-v2/credentials v1.17.41 github.com/aws/aws-sdk-go-v2/service/sqs v1.36.2 github.com/goccy/go-json v0.10.3 @@ -23,7 +23,7 @@ require ( github.com/roadrunner-server/resetter/v5 v5.0.5 github.com/roadrunner-server/rpc/v5 v5.0.4 github.com/roadrunner-server/server/v5 v5.1.2 - github.com/roadrunner-server/sqs/v5 v5.0.0 + github.com/roadrunner-server/sqs/v5 v5.0.4 github.com/stretchr/testify v1.9.0 go.uber.org/zap v1.27.0 ) @@ -68,7 +68,7 @@ require ( github.com/pborman/uuid v1.2.1 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/client_golang v1.20.4 // indirect + github.com/prometheus/client_golang v1.20.5 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.60.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect @@ -116,8 +116,8 @@ require ( golang.org/x/sys v0.26.0 // indirect golang.org/x/text v0.19.0 // indirect golang.org/x/time v0.7.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect google.golang.org/grpc v1.67.1 // indirect google.golang.org/protobuf v1.35.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/tests/go.sum b/tests/go.sum index 154b2ca..73abdb6 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -1,11 +1,11 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/Shopify/toxiproxy/v2 v2.10.0 h1:TbWuVFTml2+D3P0adNEPgASlESRHkwerMfWv3qdo9zA= -github.com/Shopify/toxiproxy/v2 v2.10.0/go.mod h1:3N/QsT0mtLNpQyO2Zw7quOzwp7mcdXdKk9XlHU/aRgI= +github.com/Shopify/toxiproxy/v2 v2.11.0 h1:iXm78nBN50T2BTs1Z8w1fdC0Y1kltkJZQEyMcYyCgGQ= +github.com/Shopify/toxiproxy/v2 v2.11.0/go.mod h1:EPnGLFvhpcwVKCsbFZwyOq4PxnGg9cFbhMrVT3ROBEo= github.com/aws/aws-sdk-go-v2 v1.32.2 h1:AkNLZEyYMLnx/Q/mSKkcMqwNFXMAvFto9bNsHqcTduI= github.com/aws/aws-sdk-go-v2 v1.32.2/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo= -github.com/aws/aws-sdk-go-v2/config v1.27.43 h1:p33fDDihFC390dhhuv8nOmX419wjOSDQRb+USt20RrU= -github.com/aws/aws-sdk-go-v2/config v1.27.43/go.mod h1:pYhbtvg1siOOg8h5an77rXle9tVG8T+BWLWAo7cOukc= +github.com/aws/aws-sdk-go-v2/config v1.28.0 h1:FosVYWcqEtWNxHn8gB/Vs6jOlNwSoyOCA/g/sxyySOQ= +github.com/aws/aws-sdk-go-v2/config v1.28.0/go.mod h1:pYhbtvg1siOOg8h5an77rXle9tVG8T+BWLWAo7cOukc= github.com/aws/aws-sdk-go-v2/credentials v1.17.41 h1:7gXo+Axmp+R4Z+AK8YFQO0ZV3L0gizGINCOWxSLY9W8= github.com/aws/aws-sdk-go-v2/credentials v1.17.41/go.mod h1:u4Eb8d3394YLubphT4jLEwN1rLNq2wFOlT6OuxFwPzU= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17 h1:TMH3f/SCAWdNtXXVPPu5D6wrr4G5hI1rAxbcocKfC7Q= @@ -132,8 +132,8 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.20.4 h1:Tgh3Yr67PaOv/uTqloMsCEdeuFTatm5zIq5+qNN23vI= -github.com/prometheus/client_golang v1.20.4/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= +github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= @@ -339,10 +339,10 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= -google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 h1:T6rh4haD3GVYsgEfWExoCZA2o2FmbNyKpTuAxbEFPTg= -google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9/go.mod h1:wp2WsuBYj6j8wUdo3ToZsdxxixbvQNAHqVJrTgi5E5M= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 h1:QCqS/PdaHTSWGvupk2F/ehwHtGc0/GYkT+3GAcR1CCc= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= +google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53 h1:fVoAXEKA4+yufmbdVYv+SE73+cPZbbbe8paLsHfkK+U= +google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53/go.mod h1:riSXTwQ4+nqmPGtobMFyW5FqVAmIs0St6VPp4Ug7CE4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 h1:X58yt85/IXCx0Y3ZwN6sEIKZzQtDEYaBWrDvErdXrRE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= diff --git a/tests/jobs_sqs_test.go b/tests/jobs_sqs_test.go index ebb6e06..0a5a9fa 100644 --- a/tests/jobs_sqs_test.go +++ b/tests/jobs_sqs_test.go @@ -601,35 +601,6 @@ func TestSQSJobsError(t *testing.T) { time.Sleep(time.Second * 5) } -func TestSQSNoGlobalSection(t *testing.T) { - cont := endure.New(slog.LevelDebug) - - cfg := &config.Plugin{ - Version: "2023.3.0", - Path: "configs/.rr-no-global.yaml", - } - - err := cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.Plugin{}, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &sqsPlugin.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - _, err = cont.Serve() - require.Error(t, err) -} - func TestSQSStat(t *testing.T) { cont := endure.New(slog.LevelDebug) @@ -806,15 +777,14 @@ func TestSQSRawPayload(t *testing.T) { time.Sleep(time.Second * 3) awsConf, err := sqsConf.LoadDefaultConfig(context.Background(), + sqsConf.WithBaseEndpoint(os.Getenv("RR_SQS_TEST_ENDPOINT")), sqsConf.WithRegion(os.Getenv("RR_SQS_TEST_REGION")), sqsConf.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(os.Getenv("RR_SQS_TEST_KEY"), os.Getenv("RR_SQS_TEST_SECRET"), ""))) require.NoError(t, err) // config with retries - endpoint := os.Getenv("RR_SQS_TEST_ENDPOINT") client := sqs.NewFromConfig(awsConf, func(o *sqs.Options) { - o.BaseEndpoint = &endpoint o.Retryer = retry.NewStandard(func(so *retry.StandardOptions) { so.MaxAttempts = 60 })