Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: support JOBS API v2 #235

Merged
merged 4 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/linters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4 # action page: <https://github.com/actions/setup-go>
with:
go-version: '1.20'
go-version: stable

- name: Run linter
uses: golangci/golangci-lint-action@v3.5.0 # Action page: <https://github.com/golangci/golangci-lint-action>
uses: golangci/golangci-lint-action@v3.6.0 # Action page: <https://github.com/golangci/golangci-lint-action>
with:
version: v1.51 # without patch version
version: v1.53 # without patch version
only-new-issues: false # show only new issues if it's a pull request
args: --timeout=10m --build-tags=race
2 changes: 1 addition & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
strategy:
fail-fast: true
matrix:
go: ["1.20"]
go: [stable]
os: ["ubuntu-latest"]
steps:
- name: Set up Go ${{ matrix.go }}
Expand Down
3 changes: 0 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,15 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/
enable:
- asciicheck # Simple linter to check that your code does not contain non-ASCII identifiers
- bodyclose # Checks whether HTTP response body is closed successfully
- depguard # Go linter that checks if package imports are in a list of acceptable packages
- dogsled # Checks assignments with too many blank identifiers (e.g. x, _, _, _, := f())
- errcheck # Errcheck is a program for checking for unchecked errors in go programs. These unchecked errors can be critical bugs in some cases
- errorlint # find code that will cause problems with the error wrapping scheme introduced in Go 1.13
- exhaustive # check exhaustiveness of enum switch statements
- exportloopref # checks for pointers to enclosing loop variables
- gochecknoglobals # Checks that no globals are present in Go code
- gochecknoinits # Checks that no init functions are present in Go code
- gocognit # Computes and checks the cognitive complexity of functions
- goconst # Finds repeated strings that could be replaced by a constant
- gocritic # The most opinionated Go source code linter
- gocyclo # Computes and checks the cyclomatic complexity of functions
- gofmt # Gofmt checks whether code was gofmt-ed. By default this tool runs with -s option to check for code simplification
- goimports # Goimports does everything that gofmt does. Additionally it checks unused imports
- revive
Expand Down
30 changes: 14 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ module github.com/roadrunner-server/sqs/v4
go 1.20

require (
github.com/aws/aws-sdk-go-v2 v1.18.0
github.com/aws/aws-sdk-go-v2/config v1.18.25
github.com/aws/aws-sdk-go-v2/credentials v1.13.24
github.com/aws/aws-sdk-go-v2/service/sqs v1.23.0
github.com/aws/aws-sdk-go-v2 v1.18.1
github.com/aws/aws-sdk-go-v2/config v1.18.27
github.com/aws/aws-sdk-go-v2/credentials v1.13.26
github.com/aws/aws-sdk-go-v2/service/sqs v1.23.2
github.com/aws/smithy-go v1.13.5
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.3.0
github.com/roadrunner-server/api/v4 v4.3.2
github.com/roadrunner-server/api/v4 v4.5.0
github.com/roadrunner-server/endure/v2 v2.2.1
github.com/roadrunner-server/errors v1.2.0
github.com/roadrunner-server/sdk/v4 v4.2.6
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/contrib/propagators/jaeger v1.17.0
go.opentelemetry.io/otel v1.16.0
Expand All @@ -23,22 +22,21 @@ require (
)

require (
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.10 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.19.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.35 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.28 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.12 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.12 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.19.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/roadrunner-server/tcplisten v1.3.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/sys v0.9.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
61 changes: 28 additions & 33 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
github.com/aws/aws-sdk-go-v2 v1.18.0 h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY=
github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2/config v1.18.25 h1:JuYyZcnMPBiFqn87L2cRppo+rNwgah6YwD3VuyvaW6Q=
github.com/aws/aws-sdk-go-v2/config v1.18.25/go.mod h1:dZnYpD5wTW/dQF0rRNLVypB396zWCcPiBIvdvSWHEg4=
github.com/aws/aws-sdk-go-v2/credentials v1.13.24 h1:PjiYyls3QdCrzqUN35jMWtUK1vqVZ+zLfdOa/UPFDp0=
github.com/aws/aws-sdk-go-v2/credentials v1.13.24/go.mod h1:jYPYi99wUOPIFi0rhiOvXeSEReVOzBqFNOX5bXYoG2o=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3 h1:jJPgroehGvjrde3XufFIJUZVK5A2L9a3KwSFgKy9n8w=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3/go.mod h1:4Q0UFP0YJf0NrsEuEYHpM9fTSEVnD16Z3uyEF7J9JGM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 h1:kG5eQilShqmJbv11XL1VpyDbaEJzWxd4zRiCG30GSn4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 h1:vFQlirhuM8lLlpI7imKOMsjdQLuN9CPi+k44F/OFVsk=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34 h1:gGLG7yKaXG02/jBlg210R7VgQIotiQntNhsCFejawx8=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34/go.mod h1:Etz2dj6UHYuw+Xw830KfzCfWGMzqvUTCjUj5b76GVDc=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27 h1:0iKliEXAcCa2qVtRs7Ot5hItA2MsufrphbRFlz1Owxo=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27/go.mod h1:EOwBD4J4S5qYszS5/3DpkejfuK+Z5/1uzICfPaZLtqw=
github.com/aws/aws-sdk-go-v2/service/sqs v1.23.0 h1:EgyGgs20+tdc2F2P7mKCD6SkWv/62fsGZlT3N5VFi5M=
github.com/aws/aws-sdk-go-v2/service/sqs v1.23.0/go.mod h1:ujUjm+PrcKUeIiKu2PT7MWjcyY0D6YZRZF3fSswiO+0=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.10 h1:UBQjaMTCKwyUYwiVnUt6toEJwGXsLBI6al083tpjJzY=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.10/go.mod h1:ouy2P4z6sJN70fR3ka3wD3Ro3KezSxU6eKGQI2+2fjI=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10 h1:PkHIIJs8qvq0e5QybnZoG1K/9QTrLr9OsqCIo59jOBA=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10/go.mod h1:AFvkxc8xfBe8XA+5St5XIHHrQQtkxqrRincx4hmMHOk=
github.com/aws/aws-sdk-go-v2/service/sts v1.19.0 h1:2DQLAKDteoEDI8zpCzqBMaZlJuoE9iTYD0gFmXVax9E=
github.com/aws/aws-sdk-go-v2/service/sts v1.19.0/go.mod h1:BgQOMsg8av8jset59jelyPW7NoZcZXLVpDsXunGDrk8=
github.com/aws/aws-sdk-go-v2 v1.18.1 h1:+tefE750oAb7ZQGzla6bLkOwfcQCEtC5y2RqoqCeqKo=
github.com/aws/aws-sdk-go-v2 v1.18.1/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2/config v1.18.27 h1:Az9uLwmssTE6OGTpsFqOnaGpLnKDqNYOJzWuC6UAYzA=
github.com/aws/aws-sdk-go-v2/config v1.18.27/go.mod h1:0My+YgmkGxeqjXZb5BYme5pc4drjTnM+x1GJ3zv42Nw=
github.com/aws/aws-sdk-go-v2/credentials v1.13.26 h1:qmU+yhKmOCyujmuPY7tf5MxR/RKyZrOPO3V4DobiTUk=
github.com/aws/aws-sdk-go-v2/credentials v1.13.26/go.mod h1:GoXt2YC8jHUBbA4jr+W3JiemnIbkXOfxSXcisUsZ3os=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.4 h1:LxK/bitrAr4lnh9LnIS6i7zWbCOdMsfzKFBI6LUCS0I=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.4/go.mod h1:E1hLXN/BL2e6YizK1zFlYd8vsfi2GTjbjBazinMmeaM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 h1:A5UqQEmPaCFpedKouS4v+dHCTUo2sKqhoKO9U5kxyWo=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34/go.mod h1:wZpTEecJe0Btj3IYnDx/VlUzor9wm3fJHyvLpQF0VwY=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 h1:srIVS45eQuewqz6fKKu6ZGXaq6FuFg5NzgQBAM6g8Y4=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28/go.mod h1:7VRpKQQedkfIEXb4k52I7swUnZP0wohVajJMRn3vsUw=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.35 h1:LWA+3kDM8ly001vJ1X1waCuLJdtTl48gwkPKWy9sosI=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.35/go.mod h1:0Eg1YjxE0Bhn56lx+SHJwCzhW+2JGtizsrx+lCqrfm0=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.28 h1:bkRyG4a929RCnpVSTvLM2j/T4ls015ZhhYApbmYs15s=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.28/go.mod h1:jj7znCIg05jXlaGBlFMGP8+7UN3VtCkRBG2spnmRQkU=
github.com/aws/aws-sdk-go-v2/service/sqs v1.23.2 h1:Y2vfLiY3HmaMisuwx6fS2kMRYbajRXXB+9vesGVPseY=
github.com/aws/aws-sdk-go-v2/service/sqs v1.23.2/go.mod h1:TaV67b6JMD1988x/uMDop/JnMFK6v5d4Ru+sDmFg+ww=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.12 h1:nneMBM2p79PGWBQovYO/6Xnc2ryRMw3InnDJq1FHkSY=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.12/go.mod h1:HuCOxYsF21eKrerARYO6HapNeh9GBNq7fius2AcwodY=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.12 h1:2qTR7IFk7/0IN/adSFhYu9Xthr0zVFTgBrmPldILn80=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.12/go.mod h1:E4VrHCPzmVB/KFXtqBGKb3c8zpbNBgKe3fisDNLAW5w=
github.com/aws/aws-sdk-go-v2/service/sts v1.19.2 h1:XFJ2Z6sNUUcAz9poj+245DMkrHE4h2j5I9/xD50RHfE=
github.com/aws/aws-sdk-go-v2/service/sts v1.19.2/go.mod h1:dp0yLPsLBOi++WTxzCjA/oZqi6NPIhoR+uF7GeMU9eg=
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
Expand All @@ -44,18 +44,13 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/roadrunner-server/api/v4 v4.3.2 h1:1zMfd2P+i9hTDFIsp9nEhCLNe+GmUrUqgnON8ZUO6Mc=
github.com/roadrunner-server/api/v4 v4.3.2/go.mod h1:HFb1kQ/H5UkD7MBNqi4L7hXQTtc919FcO8JKPqoSVzs=
github.com/roadrunner-server/api/v4 v4.5.0 h1:OUAcCwLeQbgRj7E2/6M6W2nxOnbG6XYPSS6LjW6COAQ=
github.com/roadrunner-server/api/v4 v4.5.0/go.mod h1:nzJvLrDMYT0K9hgPFmeL8dh6q2EvrJEaCHy2XRqz20c=
github.com/roadrunner-server/endure/v2 v2.2.1 h1:OkJUSd6+qqTcnl8in3bbyidEOmhO3B9uOVdR0avba28=
github.com/roadrunner-server/endure/v2 v2.2.1/go.mod h1:4eTAr3fASpdyqgFcbqVckOx68dZ4YPECecrcHvAuSdU=
github.com/roadrunner-server/errors v1.2.0 h1:qBmNXt8Iex9QnYTjCkbJKsBZu2EtYkQCM06GUDcQBbI=
github.com/roadrunner-server/errors v1.2.0/go.mod h1:z0ECxZp/dDa5RahtMcy4mBIavVxiZ9vwE5kByl7kFtY=
github.com/roadrunner-server/sdk/v4 v4.2.6 h1:BSQ+HHklszJKGCo91jqRwvgjhSkuz097cbPHMFAhIfo=
github.com/roadrunner-server/sdk/v4 v4.2.6/go.mod h1:WBLEsz9EMY6CkwpdeageMEPLevD/PaUf4rOOsBsaKlo=
github.com/roadrunner-server/tcplisten v1.3.0 h1:VDd6IbP8oIjm5vKvMVozeZgeHgOcoP0XYLOyOqcZHCY=
github.com/roadrunner-server/tcplisten v1.3.0/go.mod h1:VR6Ob5am0oEuLMOeLiVvQxG9ShykAEgrlvZddX8EfoU=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
go.opentelemetry.io/contrib/propagators/jaeger v1.17.0 h1:Zbpbmwav32Ea5jSotpmkWEl3a6Xvd4tw/3xxGO1i05Y=
Expand All @@ -75,8 +70,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
7 changes: 3 additions & 4 deletions plugin.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package sqs

import (
"github.com/roadrunner-server/api/v4/plugins/v1/jobs"
pq "github.com/roadrunner-server/api/v4/plugins/v1/priority_queue"
"github.com/roadrunner-server/api/v4/plugins/v2/jobs"
"github.com/roadrunner-server/endure/v2/dep"
"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/sqs/v4/sqsjobs"
Expand Down Expand Up @@ -60,10 +59,10 @@ func (p *Plugin) Collects() []*dep.In {
}
}

func (p *Plugin) DriverFromConfig(configKey string, pq pq.Queue, pipeline jobs.Pipeline, _ chan<- jobs.Commander) (jobs.Driver, error) {
func (p *Plugin) DriverFromConfig(configKey string, pq jobs.Queue, pipeline jobs.Pipeline, _ chan<- jobs.Commander) (jobs.Driver, error) {
return sqsjobs.FromConfig(p.tracer, configKey, pipeline, p.log, p.cfg, pq)
}

func (p *Plugin) DriverFromPipeline(pipe jobs.Pipeline, pq pq.Queue, _ chan<- jobs.Commander) (jobs.Driver, error) {
func (p *Plugin) DriverFromPipeline(pipe jobs.Pipeline, pq jobs.Queue, _ chan<- jobs.Commander) (jobs.Driver, error) {
return sqsjobs.FromPipeline(p.tracer, pipe, p.log, p.cfg, pq)
}
42 changes: 31 additions & 11 deletions sqsjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/retry"
Expand All @@ -16,8 +17,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/aws/smithy-go"
"github.com/roadrunner-server/api/v4/plugins/v1/jobs"
pq "github.com/roadrunner-server/api/v4/plugins/v1/priority_queue"
"github.com/roadrunner-server/api/v4/plugins/v2/jobs"
"github.com/roadrunner-server/errors"
jprop "go.opentelemetry.io/contrib/propagators/jaeger"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -50,7 +50,7 @@ type Driver struct {
msgInFlight *int64
msgInFlightLimit *int32

pq pq.Queue
pq jobs.Queue
log *zap.Logger
pipeline atomic.Pointer[jobs.Pipeline]
consumeAll bool
Expand Down Expand Up @@ -78,10 +78,11 @@ type Driver struct {
client *sqs.Client
queueURL *string

stopped uint64
pauseCh chan struct{}
}

func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq pq.Queue) (*Driver, error) {
func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("new_sqs_consumer")
/*
we need to determine in what environment we are running
Expand Down Expand Up @@ -173,7 +174,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip
return jb, nil
}

func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq pq.Queue) (*Driver, error) {
func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("new_sqs_consumer")

/*
Expand Down Expand Up @@ -267,7 +268,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.
return jb, nil
}

func (c *Driver) Push(ctx context.Context, jb jobs.Job) error {
func (c *Driver) Push(ctx context.Context, jb jobs.Message) error {
const op = errors.Op("sqs_push")
// check if the pipeline registered

Expand All @@ -276,8 +277,8 @@ func (c *Driver) Push(ctx context.Context, jb jobs.Job) error {

// load atomic value
pipe := *c.pipeline.Load()
if pipe.Name() != jb.Pipeline() {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Pipeline(), pipe.Name()))
if pipe.Name() != jb.GroupID() {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.GroupID(), pipe.Name()))
}

// The length of time, in seconds, for which to delay a specific message. Valid
Expand Down Expand Up @@ -316,7 +317,7 @@ func (c *Driver) Run(ctx context.Context, p jobs.Pipeline) error {
ctxCancel, c.cancel = context.WithCancel(context.Background())
c.listen(ctxCancel)

c.log.Debug("pipeline is active", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start), zap.Duration("elapsed", time.Since(start)))
c.log.Debug("pipeline was started", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start), zap.Duration("elapsed", time.Since(start)))
return nil
}

Expand All @@ -326,6 +327,10 @@ func (c *Driver) Stop(ctx context.Context) error {
_, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(tracerName).Start(ctx, "sqs_stop")
defer span.End()

atomic.StoreUint64(&c.stopped, 1)
pipe := *c.pipeline.Load()
_ = c.pq.Remove(pipe.Name())

if atomic.LoadUint32(&c.listeners) > 0 {
if c.cancel != nil {
c.cancel()
Expand All @@ -336,7 +341,6 @@ func (c *Driver) Stop(ctx context.Context) error {
c.pauseCh <- struct{}{}
}

pipe := *c.pipeline.Load()
c.log.Debug("pipeline was stopped", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", time.Now().UTC()), zap.Duration("elapsed", time.Since(start)))
return nil
}
Expand Down Expand Up @@ -456,7 +460,7 @@ func (c *Driver) State(ctx context.Context) (*jobs.State, error) {
}

func (c *Driver) handleItem(ctx context.Context, msg *Item) error {
c.prop.Inject(ctx, propagation.HeaderCarrier(msg.Headers))
c.prop.Inject(ctx, propagation.HeaderCarrier(msg.headers))

d, err := msg.pack(c.queueURL, c.queue, c.messageGroupID)
if err != nil {
Expand Down Expand Up @@ -613,3 +617,19 @@ func ptr[T any](val T) *T {
func ready(r uint32) bool {
return r > 0
}

func bytesToStr(data []byte) string {
if len(data) == 0 {
return ""
}

return unsafe.String(unsafe.SliceData(data), len(data))
}

func strToBytes(data string) []byte {
if data == "" {
return nil
}

return unsafe.Slice(unsafe.StringData(data), len(data))
}
Loading