Skip to content

Commit

Permalink
[#235]: chore: support JOBS API v2
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Jun 21, 2023
2 parents f3f3906 + 37c3926 commit 8d59cb0
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 94 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/linters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4 # action page: <https://github.com/actions/setup-go>
with:
go-version: '1.20'
go-version: stable

- name: Run linter
uses: golangci/golangci-lint-action@v3.5.0 # Action page: <https://github.com/golangci/golangci-lint-action>
uses: golangci/golangci-lint-action@v3.6.0 # Action page: <https://github.com/golangci/golangci-lint-action>
with:
version: v1.51 # without patch version
version: v1.53 # without patch version
only-new-issues: false # show only new issues if it's a pull request
args: --timeout=10m --build-tags=race
2 changes: 1 addition & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
strategy:
fail-fast: true
matrix:
go: ["1.20"]
go: [stable]
os: ["ubuntu-latest"]
steps:
- name: Set up Go ${{ matrix.go }}
Expand Down
3 changes: 0 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,15 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/
enable:
- asciicheck # Simple linter to check that your code does not contain non-ASCII identifiers
- bodyclose # Checks whether HTTP response body is closed successfully
- depguard # Go linter that checks if package imports are in a list of acceptable packages
- dogsled # Checks assignments with too many blank identifiers (e.g. x, _, _, _, := f())
- errcheck # Errcheck is a program for checking for unchecked errors in go programs. These unchecked errors can be critical bugs in some cases
- errorlint # find code that will cause problems with the error wrapping scheme introduced in Go 1.13
- exhaustive # check exhaustiveness of enum switch statements
- exportloopref # checks for pointers to enclosing loop variables
- gochecknoglobals # Checks that no globals are present in Go code
- gochecknoinits # Checks that no init functions are present in Go code
- gocognit # Computes and checks the cognitive complexity of functions
- goconst # Finds repeated strings that could be replaced by a constant
- gocritic # The most opinionated Go source code linter
- gocyclo # Computes and checks the cyclomatic complexity of functions
- gofmt # Gofmt checks whether code was gofmt-ed. By default this tool runs with -s option to check for code simplification
- goimports # Goimports does everything that gofmt does. Additionally it checks unused imports
- revive
Expand Down
30 changes: 14 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ module github.com/roadrunner-server/sqs/v4
go 1.20

require (
github.com/aws/aws-sdk-go-v2 v1.18.0
github.com/aws/aws-sdk-go-v2/config v1.18.25
github.com/aws/aws-sdk-go-v2/credentials v1.13.24
github.com/aws/aws-sdk-go-v2/service/sqs v1.23.0
github.com/aws/aws-sdk-go-v2 v1.18.1
github.com/aws/aws-sdk-go-v2/config v1.18.27
github.com/aws/aws-sdk-go-v2/credentials v1.13.26
github.com/aws/aws-sdk-go-v2/service/sqs v1.23.2
github.com/aws/smithy-go v1.13.5
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.3.0
github.com/roadrunner-server/api/v4 v4.3.2
github.com/roadrunner-server/api/v4 v4.5.0
github.com/roadrunner-server/endure/v2 v2.2.1
github.com/roadrunner-server/errors v1.2.0
github.com/roadrunner-server/sdk/v4 v4.2.6
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/contrib/propagators/jaeger v1.17.0
go.opentelemetry.io/otel v1.16.0
Expand All @@ -23,22 +22,21 @@ require (
)

require (
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.10 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.19.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.35 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.28 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.12 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.12 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.19.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/roadrunner-server/tcplisten v1.3.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/sys v0.9.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
61 changes: 28 additions & 33 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
github.com/aws/aws-sdk-go-v2 v1.18.0 h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY=
github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2/config v1.18.25 h1:JuYyZcnMPBiFqn87L2cRppo+rNwgah6YwD3VuyvaW6Q=
github.com/aws/aws-sdk-go-v2/config v1.18.25/go.mod h1:dZnYpD5wTW/dQF0rRNLVypB396zWCcPiBIvdvSWHEg4=
github.com/aws/aws-sdk-go-v2/credentials v1.13.24 h1:PjiYyls3QdCrzqUN35jMWtUK1vqVZ+zLfdOa/UPFDp0=
github.com/aws/aws-sdk-go-v2/credentials v1.13.24/go.mod h1:jYPYi99wUOPIFi0rhiOvXeSEReVOzBqFNOX5bXYoG2o=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3 h1:jJPgroehGvjrde3XufFIJUZVK5A2L9a3KwSFgKy9n8w=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3/go.mod h1:4Q0UFP0YJf0NrsEuEYHpM9fTSEVnD16Z3uyEF7J9JGM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 h1:kG5eQilShqmJbv11XL1VpyDbaEJzWxd4zRiCG30GSn4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 h1:vFQlirhuM8lLlpI7imKOMsjdQLuN9CPi+k44F/OFVsk=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34 h1:gGLG7yKaXG02/jBlg210R7VgQIotiQntNhsCFejawx8=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34/go.mod h1:Etz2dj6UHYuw+Xw830KfzCfWGMzqvUTCjUj5b76GVDc=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27 h1:0iKliEXAcCa2qVtRs7Ot5hItA2MsufrphbRFlz1Owxo=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27/go.mod h1:EOwBD4J4S5qYszS5/3DpkejfuK+Z5/1uzICfPaZLtqw=
github.com/aws/aws-sdk-go-v2/service/sqs v1.23.0 h1:EgyGgs20+tdc2F2P7mKCD6SkWv/62fsGZlT3N5VFi5M=
github.com/aws/aws-sdk-go-v2/service/sqs v1.23.0/go.mod h1:ujUjm+PrcKUeIiKu2PT7MWjcyY0D6YZRZF3fSswiO+0=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.10 h1:UBQjaMTCKwyUYwiVnUt6toEJwGXsLBI6al083tpjJzY=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.10/go.mod h1:ouy2P4z6sJN70fR3ka3wD3Ro3KezSxU6eKGQI2+2fjI=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10 h1:PkHIIJs8qvq0e5QybnZoG1K/9QTrLr9OsqCIo59jOBA=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10/go.mod h1:AFvkxc8xfBe8XA+5St5XIHHrQQtkxqrRincx4hmMHOk=
github.com/aws/aws-sdk-go-v2/service/sts v1.19.0 h1:2DQLAKDteoEDI8zpCzqBMaZlJuoE9iTYD0gFmXVax9E=
github.com/aws/aws-sdk-go-v2/service/sts v1.19.0/go.mod h1:BgQOMsg8av8jset59jelyPW7NoZcZXLVpDsXunGDrk8=
github.com/aws/aws-sdk-go-v2 v1.18.1 h1:+tefE750oAb7ZQGzla6bLkOwfcQCEtC5y2RqoqCeqKo=
github.com/aws/aws-sdk-go-v2 v1.18.1/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2/config v1.18.27 h1:Az9uLwmssTE6OGTpsFqOnaGpLnKDqNYOJzWuC6UAYzA=
github.com/aws/aws-sdk-go-v2/config v1.18.27/go.mod h1:0My+YgmkGxeqjXZb5BYme5pc4drjTnM+x1GJ3zv42Nw=
github.com/aws/aws-sdk-go-v2/credentials v1.13.26 h1:qmU+yhKmOCyujmuPY7tf5MxR/RKyZrOPO3V4DobiTUk=
github.com/aws/aws-sdk-go-v2/credentials v1.13.26/go.mod h1:GoXt2YC8jHUBbA4jr+W3JiemnIbkXOfxSXcisUsZ3os=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.4 h1:LxK/bitrAr4lnh9LnIS6i7zWbCOdMsfzKFBI6LUCS0I=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.4/go.mod h1:E1hLXN/BL2e6YizK1zFlYd8vsfi2GTjbjBazinMmeaM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 h1:A5UqQEmPaCFpedKouS4v+dHCTUo2sKqhoKO9U5kxyWo=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34/go.mod h1:wZpTEecJe0Btj3IYnDx/VlUzor9wm3fJHyvLpQF0VwY=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 h1:srIVS45eQuewqz6fKKu6ZGXaq6FuFg5NzgQBAM6g8Y4=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28/go.mod h1:7VRpKQQedkfIEXb4k52I7swUnZP0wohVajJMRn3vsUw=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.35 h1:LWA+3kDM8ly001vJ1X1waCuLJdtTl48gwkPKWy9sosI=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.35/go.mod h1:0Eg1YjxE0Bhn56lx+SHJwCzhW+2JGtizsrx+lCqrfm0=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.28 h1:bkRyG4a929RCnpVSTvLM2j/T4ls015ZhhYApbmYs15s=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.28/go.mod h1:jj7znCIg05jXlaGBlFMGP8+7UN3VtCkRBG2spnmRQkU=
github.com/aws/aws-sdk-go-v2/service/sqs v1.23.2 h1:Y2vfLiY3HmaMisuwx6fS2kMRYbajRXXB+9vesGVPseY=
github.com/aws/aws-sdk-go-v2/service/sqs v1.23.2/go.mod h1:TaV67b6JMD1988x/uMDop/JnMFK6v5d4Ru+sDmFg+ww=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.12 h1:nneMBM2p79PGWBQovYO/6Xnc2ryRMw3InnDJq1FHkSY=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.12/go.mod h1:HuCOxYsF21eKrerARYO6HapNeh9GBNq7fius2AcwodY=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.12 h1:2qTR7IFk7/0IN/adSFhYu9Xthr0zVFTgBrmPldILn80=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.12/go.mod h1:E4VrHCPzmVB/KFXtqBGKb3c8zpbNBgKe3fisDNLAW5w=
github.com/aws/aws-sdk-go-v2/service/sts v1.19.2 h1:XFJ2Z6sNUUcAz9poj+245DMkrHE4h2j5I9/xD50RHfE=
github.com/aws/aws-sdk-go-v2/service/sts v1.19.2/go.mod h1:dp0yLPsLBOi++WTxzCjA/oZqi6NPIhoR+uF7GeMU9eg=
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
Expand All @@ -44,18 +44,13 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/roadrunner-server/api/v4 v4.3.2 h1:1zMfd2P+i9hTDFIsp9nEhCLNe+GmUrUqgnON8ZUO6Mc=
github.com/roadrunner-server/api/v4 v4.3.2/go.mod h1:HFb1kQ/H5UkD7MBNqi4L7hXQTtc919FcO8JKPqoSVzs=
github.com/roadrunner-server/api/v4 v4.5.0 h1:OUAcCwLeQbgRj7E2/6M6W2nxOnbG6XYPSS6LjW6COAQ=
github.com/roadrunner-server/api/v4 v4.5.0/go.mod h1:nzJvLrDMYT0K9hgPFmeL8dh6q2EvrJEaCHy2XRqz20c=
github.com/roadrunner-server/endure/v2 v2.2.1 h1:OkJUSd6+qqTcnl8in3bbyidEOmhO3B9uOVdR0avba28=
github.com/roadrunner-server/endure/v2 v2.2.1/go.mod h1:4eTAr3fASpdyqgFcbqVckOx68dZ4YPECecrcHvAuSdU=
github.com/roadrunner-server/errors v1.2.0 h1:qBmNXt8Iex9QnYTjCkbJKsBZu2EtYkQCM06GUDcQBbI=
github.com/roadrunner-server/errors v1.2.0/go.mod h1:z0ECxZp/dDa5RahtMcy4mBIavVxiZ9vwE5kByl7kFtY=
github.com/roadrunner-server/sdk/v4 v4.2.6 h1:BSQ+HHklszJKGCo91jqRwvgjhSkuz097cbPHMFAhIfo=
github.com/roadrunner-server/sdk/v4 v4.2.6/go.mod h1:WBLEsz9EMY6CkwpdeageMEPLevD/PaUf4rOOsBsaKlo=
github.com/roadrunner-server/tcplisten v1.3.0 h1:VDd6IbP8oIjm5vKvMVozeZgeHgOcoP0XYLOyOqcZHCY=
github.com/roadrunner-server/tcplisten v1.3.0/go.mod h1:VR6Ob5am0oEuLMOeLiVvQxG9ShykAEgrlvZddX8EfoU=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
go.opentelemetry.io/contrib/propagators/jaeger v1.17.0 h1:Zbpbmwav32Ea5jSotpmkWEl3a6Xvd4tw/3xxGO1i05Y=
Expand All @@ -75,8 +70,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
7 changes: 3 additions & 4 deletions plugin.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package sqs

import (
"github.com/roadrunner-server/api/v4/plugins/v1/jobs"
pq "github.com/roadrunner-server/api/v4/plugins/v1/priority_queue"
"github.com/roadrunner-server/api/v4/plugins/v2/jobs"
"github.com/roadrunner-server/endure/v2/dep"
"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/sqs/v4/sqsjobs"
Expand Down Expand Up @@ -60,10 +59,10 @@ func (p *Plugin) Collects() []*dep.In {
}
}

func (p *Plugin) DriverFromConfig(configKey string, pq pq.Queue, pipeline jobs.Pipeline, _ chan<- jobs.Commander) (jobs.Driver, error) {
func (p *Plugin) DriverFromConfig(configKey string, pq jobs.Queue, pipeline jobs.Pipeline, _ chan<- jobs.Commander) (jobs.Driver, error) {
return sqsjobs.FromConfig(p.tracer, configKey, pipeline, p.log, p.cfg, pq)
}

func (p *Plugin) DriverFromPipeline(pipe jobs.Pipeline, pq pq.Queue, _ chan<- jobs.Commander) (jobs.Driver, error) {
func (p *Plugin) DriverFromPipeline(pipe jobs.Pipeline, pq jobs.Queue, _ chan<- jobs.Commander) (jobs.Driver, error) {
return sqsjobs.FromPipeline(p.tracer, pipe, p.log, p.cfg, pq)
}
42 changes: 31 additions & 11 deletions sqsjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/retry"
Expand All @@ -16,8 +17,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/aws/smithy-go"
"github.com/roadrunner-server/api/v4/plugins/v1/jobs"
pq "github.com/roadrunner-server/api/v4/plugins/v1/priority_queue"
"github.com/roadrunner-server/api/v4/plugins/v2/jobs"
"github.com/roadrunner-server/errors"
jprop "go.opentelemetry.io/contrib/propagators/jaeger"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -50,7 +50,7 @@ type Driver struct {
msgInFlight *int64
msgInFlightLimit *int32

pq pq.Queue
pq jobs.Queue
log *zap.Logger
pipeline atomic.Pointer[jobs.Pipeline]
consumeAll bool
Expand Down Expand Up @@ -78,10 +78,11 @@ type Driver struct {
client *sqs.Client
queueURL *string

stopped uint64
pauseCh chan struct{}
}

func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq pq.Queue) (*Driver, error) {
func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("new_sqs_consumer")
/*
we need to determine in what environment we are running
Expand Down Expand Up @@ -173,7 +174,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip
return jb, nil
}

func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq pq.Queue) (*Driver, error) {
func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("new_sqs_consumer")

/*
Expand Down Expand Up @@ -267,7 +268,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.
return jb, nil
}

func (c *Driver) Push(ctx context.Context, jb jobs.Job) error {
func (c *Driver) Push(ctx context.Context, jb jobs.Message) error {
const op = errors.Op("sqs_push")
// check if the pipeline registered

Expand All @@ -276,8 +277,8 @@ func (c *Driver) Push(ctx context.Context, jb jobs.Job) error {

// load atomic value
pipe := *c.pipeline.Load()
if pipe.Name() != jb.Pipeline() {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Pipeline(), pipe.Name()))
if pipe.Name() != jb.GroupID() {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.GroupID(), pipe.Name()))
}

// The length of time, in seconds, for which to delay a specific message. Valid
Expand Down Expand Up @@ -316,7 +317,7 @@ func (c *Driver) Run(ctx context.Context, p jobs.Pipeline) error {
ctxCancel, c.cancel = context.WithCancel(context.Background())
c.listen(ctxCancel)

c.log.Debug("pipeline is active", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start), zap.Duration("elapsed", time.Since(start)))
c.log.Debug("pipeline was started", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start), zap.Duration("elapsed", time.Since(start)))
return nil
}

Expand All @@ -326,6 +327,10 @@ func (c *Driver) Stop(ctx context.Context) error {
_, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(tracerName).Start(ctx, "sqs_stop")
defer span.End()

atomic.StoreUint64(&c.stopped, 1)
pipe := *c.pipeline.Load()
_ = c.pq.Remove(pipe.Name())

if atomic.LoadUint32(&c.listeners) > 0 {
if c.cancel != nil {
c.cancel()
Expand All @@ -336,7 +341,6 @@ func (c *Driver) Stop(ctx context.Context) error {
c.pauseCh <- struct{}{}
}

pipe := *c.pipeline.Load()
c.log.Debug("pipeline was stopped", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", time.Now().UTC()), zap.Duration("elapsed", time.Since(start)))
return nil
}
Expand Down Expand Up @@ -456,7 +460,7 @@ func (c *Driver) State(ctx context.Context) (*jobs.State, error) {
}

func (c *Driver) handleItem(ctx context.Context, msg *Item) error {
c.prop.Inject(ctx, propagation.HeaderCarrier(msg.Headers))
c.prop.Inject(ctx, propagation.HeaderCarrier(msg.headers))

d, err := msg.pack(c.queueURL, c.queue, c.messageGroupID)
if err != nil {
Expand Down Expand Up @@ -613,3 +617,19 @@ func ptr[T any](val T) *T {
func ready(r uint32) bool {
return r > 0
}

func bytesToStr(data []byte) string {
if len(data) == 0 {
return ""
}

return unsafe.String(unsafe.SliceData(data), len(data))
}

func strToBytes(data string) []byte {
if data == "" {
return nil
}

return unsafe.Slice(unsafe.StringData(data), len(data))
}
Loading

0 comments on commit 8d59cb0

Please sign in to comment.