diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml index 36de79f..79294b8 100644 --- a/.github/workflows/linters.yml +++ b/.github/workflows/linters.yml @@ -13,11 +13,11 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 # action page: with: - go-version: '1.20' + go-version: stable - name: Run linter - uses: golangci/golangci-lint-action@v3.5.0 # Action page: + uses: golangci/golangci-lint-action@v3.6.0 # Action page: with: - version: v1.51 # without patch version + version: v1.53 # without patch version only-new-issues: false # show only new issues if it's a pull request args: --timeout=10m --build-tags=race diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 4705873..e86b989 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -24,7 +24,7 @@ jobs: strategy: fail-fast: true matrix: - go: ["1.20"] + go: [stable] os: ["ubuntu-latest"] steps: - name: Set up Go ${{ matrix.go }} diff --git a/.golangci.yml b/.golangci.yml index b5d5a91..33a23a8 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -44,7 +44,6 @@ linters: # All available linters list: 0 { if c.cancel != nil { c.cancel() @@ -336,7 +341,6 @@ func (c *Driver) Stop(ctx context.Context) error { c.pauseCh <- struct{}{} } - pipe := *c.pipeline.Load() c.log.Debug("pipeline was stopped", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", time.Now().UTC()), zap.Duration("elapsed", time.Since(start))) return nil } @@ -456,7 +460,7 @@ func (c *Driver) State(ctx context.Context) (*jobs.State, error) { } func (c *Driver) handleItem(ctx context.Context, msg *Item) error { - c.prop.Inject(ctx, propagation.HeaderCarrier(msg.Headers)) + c.prop.Inject(ctx, propagation.HeaderCarrier(msg.headers)) d, err := msg.pack(c.queueURL, c.queue, c.messageGroupID) if err != nil { @@ -613,3 +617,19 @@ func ptr[T any](val T) *T { func ready(r uint32) bool { return r > 0 } + +func bytesToStr(data []byte) string { + if len(data) == 0 { + return "" + } + + return unsafe.String(unsafe.SliceData(data), len(data)) +} + +func strToBytes(data string) []byte { + if data == "" { + return nil + } + + return unsafe.Slice(unsafe.StringData(data), len(data)) +} diff --git a/sqsjobs/item.go b/sqsjobs/item.go index 903bf4c..01b98bd 100644 --- a/sqsjobs/item.go +++ b/sqsjobs/item.go @@ -13,9 +13,8 @@ import ( "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/goccy/go-json" "github.com/google/uuid" - "github.com/roadrunner-server/api/v4/plugins/v1/jobs" + "github.com/roadrunner-server/api/v4/plugins/v2/jobs" "github.com/roadrunner-server/errors" - "github.com/roadrunner-server/sdk/v4/utils" "go.uber.org/zap" ) @@ -47,7 +46,7 @@ type Item struct { // Payload is string data (usually JSON) passed to Job broker. Payload string `json:"payload"` // Headers with key-values pairs - Headers map[string][]string `json:"headers"` + headers map[string][]string // Options contains set of PipelineOptions specific to job execution. Can be empty. Options *Options `json:"options,omitempty"` } @@ -68,6 +67,7 @@ type Options struct { // Private ================ cond *sync.Cond + stopped *uint64 msgInFlight *int64 approxReceiveCount int64 queue *string @@ -89,13 +89,17 @@ func (i *Item) Priority() int64 { return i.Options.Priority } -func (i *Item) Metadata() map[string][]string { - return i.Headers +func (i *Item) GroupID() string { + return i.Options.Pipeline +} + +func (i *Item) Headers() map[string][]string { + return i.headers } // Body packs job payload into binary payload. func (i *Item) Body() []byte { - return utils.AsBytes(i.Payload) + return strToBytes(i.Payload) } // Context packs job context (job, id) into binary payload. @@ -113,7 +117,7 @@ func (i *Item) Context() ([]byte, error) { ID: i.Ident, Job: i.Job, Driver: pluginName, - Headers: i.Headers, + Headers: i.headers, Queue: i.Options.Queue, Pipeline: i.Options.Pipeline, }, @@ -127,6 +131,9 @@ func (i *Item) Context() ([]byte, error) { } func (i *Item) Ack() error { + if atomic.LoadUint64(i.Options.stopped) == 1 { + return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped") + } defer func() { i.Options.cond.Signal() atomic.AddInt64(i.Options.msgInFlight, ^int64(0)) @@ -148,6 +155,9 @@ func (i *Item) Ack() error { } func (i *Item) Nack() error { + if atomic.LoadUint64(i.Options.stopped) == 1 { + return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped") + } defer func() { i.Options.cond.Signal() atomic.AddInt64(i.Options.msgInFlight, ^int64(0)) @@ -176,13 +186,16 @@ func (i *Item) Nack() error { } func (i *Item) Requeue(headers map[string][]string, delay int64) error { + if atomic.LoadUint64(i.Options.stopped) == 1 { + return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped") + } defer func() { i.Options.cond.Signal() atomic.AddInt64(i.Options.msgInFlight, ^int64(0)) }() // overwrite the delay i.Options.Delay = delay - i.Headers = headers + i.headers = headers // requeue message err := i.Options.requeueFn(context.Background(), i) @@ -206,19 +219,15 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { return nil } -func (i *Item) Respond(_ []byte, _ string) error { - return nil -} - -func fromJob(job jobs.Job) *Item { +func fromJob(job jobs.Message) *Item { return &Item{ Job: job.Name(), Ident: job.ID(), Payload: job.Payload(), - Headers: job.Headers(), + headers: job.Headers(), Options: &Options{ Priority: job.Priority(), - Pipeline: job.Pipeline(), + Pipeline: job.GroupID(), Delay: job.Delay(), AutoAck: job.AutoAck(), }, @@ -227,7 +236,7 @@ func fromJob(job jobs.Job) *Item { func (i *Item) pack(queueURL, origQueue *string, mg string) (*sqs.SendMessageInput, error) { // pack headers map - data, err := json.Marshal(i.Headers) + data, err := json.Marshal(i.headers) if err != nil { return nil, err } @@ -252,6 +261,8 @@ func (i *Item) pack(queueURL, origQueue *string, mg string) (*sqs.SendMessageInp func (c *Driver) fromMsg(msg *types.Message) (*Item, error) { item, err := c.unpack(msg) + // 2023.2.0 + item.Options.stopped = &c.stopped if err == nil { return item, nil } @@ -279,8 +290,8 @@ func (c *Driver) fromMsg(msg *types.Message) (*Item, error) { return &Item{ Job: auto, Ident: id, - Payload: utils.AsString(data), - Headers: convAttr(msg.Attributes), + Payload: bytesToStr(data), + headers: convAttr(msg.Attributes), Options: &Options{ Priority: 10, Pipeline: (*c.pipeline.Load()).Name(), @@ -301,7 +312,7 @@ func (c *Driver) fromMsg(msg *types.Message) (*Item, error) { Job: auto, Ident: id, Payload: checkBody(msg.Body), - Headers: convAttr(msg.Attributes), + headers: convAttr(msg.Attributes), Options: &Options{ Priority: 10, Queue: checkBody(c.queue), @@ -368,7 +379,7 @@ func (c *Driver) unpack(msg *types.Message) (*Item, error) { Job: *msg.MessageAttributes[jobs.RRJob].StringValue, Ident: *msg.MessageAttributes[jobs.RRID].StringValue, Payload: *msg.Body, - Headers: h, + headers: h, Options: &Options{ AutoAck: autoAck, Delay: int64(d), @@ -443,5 +454,5 @@ func checkBody(body *string) string { func isJSONEncoded(data *string) error { var a any - return json.Unmarshal(utils.AsBytes(*data), &a) + return json.Unmarshal(strToBytes(*data), &a) } diff --git a/sqsjobs/listener.go b/sqsjobs/listener.go index 7c1f501..218de53 100644 --- a/sqsjobs/listener.go +++ b/sqsjobs/listener.go @@ -85,7 +85,7 @@ func (c *Driver) listen(ctx context.Context) { //nolint:gocognit c.log.Debug("receive message", zap.Stringp("ID", m.MessageId)) item, errUnp := c.fromMsg(&m) - ctx := c.prop.Extract(context.Background(), propagation.HeaderCarrier(item.Headers)) + ctx := c.prop.Extract(context.Background(), propagation.HeaderCarrier(item.headers)) ctx, span := c.tracer.Tracer(tracerName).Start(ctx, "sqs_listener") if errUnp != nil { @@ -142,7 +142,11 @@ func (c *Driver) listen(ctx context.Context) { //nolint:gocognit span.End() } - c.prop.Inject(ctx, propagation.HeaderCarrier(item.Headers)) + if item.headers == nil { + item.headers = make(map[string][]string, 2) + } + + c.prop.Inject(ctx, propagation.HeaderCarrier(item.headers)) c.pq.Insert(item) // increase the current number of messages