Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add support for ErrorVisibilityTimeout and job retention #571

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
48 changes: 40 additions & 8 deletions sqsjobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ import (
)

const (
attributes string = "attributes"
tags string = "tags"
queue string = "queue"
pref string = "prefetch"
visibility string = "visibility_timeout"
messageGroupID string = "message_group_id"
waitTime string = "wait_time"
skipQueueDeclaration string = "skip_queue_declaration"
attributes string = "attributes"
tags string = "tags"
queue string = "queue"
pref string = "prefetch"
visibility string = "visibility_timeout"
errorVisibilityTimeout string = "error_visibility_timeout"
retainFailedJobs string = "retain_failed_jobs"
messageGroupID string = "message_group_id"
waitTime string = "wait_time"
skipQueueDeclaration string = "skip_queue_declaration"
maxVisibilityTimeout int32 = 43200
)

// Config is used to parse pipeline configuration
Expand All @@ -34,15 +37,31 @@ type Config struct {
// The duration (in seconds) that the received messages are hidden from subsequent
// retrieve requests after being retrieved by a ReceiveMessage request.
VisibilityTimeout int32 `mapstructure:"visibility_timeout"`

// If defined (> 0) and RetainFailedJobs is true, RR will change the visibility timeout of failed jobs and let them
// be received again, instead of deleting and re-queueing them as new jobs. This allows you to use the automatic SQS
// dead-letter feature by setting a maximum receive count on your queue. This produces similar behavior to Elastic
// Beanstalk's worker environments.
// If this is enabled, your driver credentials must have the sqs:ChangeMessageVisibility permission for the queue.
ErrorVisibilityTimeout int32 `mapstructure:"error_visibility_timeout"`

// Whether to retain failed jobs in the queue. If you set this to true, jobs will be consumed by the
// workers again after VisibilityTimeout, or ErrorVisibilityTimeout (if set), has passed.
// If this is false, jobs will be deleted from the queue and immediately queued again as new jobs.
// Defaults to false.
RetainFailedJobs bool `mapstructure:"retain_failed_jobs"`
rustatian marked this conversation as resolved.
Show resolved Hide resolved

// The duration (in seconds) for which the call waits for a message to arrive
// in the queue before returning. If a message is available, the call returns
// sooner than WaitTimeSeconds. If no messages are available and the wait time
// expires, the call returns successfully with an empty list of messages.
WaitTimeSeconds int32 `mapstructure:"wait_time_seconds"`

// Prefetch is the maximum number of messages to return. Amazon SQS never returns more messages
// than this value (however, fewer messages might be returned). Valid values: 1 to
// 10. Default: 1.
Prefetch int32 `mapstructure:"prefetch"`

// The name of the new queue. The following limits apply to this name:
//
// * A queue
Expand Down Expand Up @@ -121,6 +140,19 @@ func (c *Config) InitDefault() {
c.WaitTimeSeconds = 5
}

// Make sure visibility timeouts are within the allowed boundaries.
if c.VisibilityTimeout < 0 {
c.VisibilityTimeout = 0
} else if c.VisibilityTimeout > maxVisibilityTimeout {
c.VisibilityTimeout = maxVisibilityTimeout
}

if c.ErrorVisibilityTimeout < 0 {
c.ErrorVisibilityTimeout = 0
} else if c.ErrorVisibilityTimeout > maxVisibilityTimeout {
c.ErrorVisibilityTimeout = maxVisibilityTimeout
}

if c.Attributes != nil {
newAttr := make(map[string]string, len(c.Attributes))
toAwsAttribute(c.Attributes, newAttr)
Expand Down
66 changes: 36 additions & 30 deletions sqsjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ type Driver struct {
cancel context.CancelFunc

// connection info
queue *string
messageGroupID string
waitTime int32
visibilityTimeout int32
queue *string
messageGroupID string
waitTime int32
visibilityTimeout int32
errorVisibilityTimeout int32
retainFailedJobs bool

// if a user invokes several resume operations
listeners uint32
Expand Down Expand Up @@ -113,19 +115,21 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip

// initialize job Driver
jb := &Driver{
tracer: tracer,
prop: prop,
cond: sync.Cond{L: &sync.Mutex{}},
pq: pq,
log: log,
skipDeclare: conf.SkipQueueDeclaration,
messageGroupID: conf.MessageGroupID,
attributes: conf.Attributes,
tags: conf.Tags,
queue: conf.Queue,
visibilityTimeout: conf.VisibilityTimeout,
waitTime: conf.WaitTimeSeconds,
pauseCh: make(chan struct{}, 1),
tracer: tracer,
prop: prop,
cond: sync.Cond{L: &sync.Mutex{}},
pq: pq,
log: log,
skipDeclare: conf.SkipQueueDeclaration,
messageGroupID: conf.MessageGroupID,
attributes: conf.Attributes,
tags: conf.Tags,
queue: conf.Queue,
visibilityTimeout: conf.VisibilityTimeout,
errorVisibilityTimeout: conf.ErrorVisibilityTimeout,
retainFailedJobs: conf.RetainFailedJobs,
waitTime: conf.WaitTimeSeconds,
pauseCh: make(chan struct{}, 1),
nickdnk marked this conversation as resolved.
Show resolved Hide resolved
// new in 2.12.1
msgInFlightLimit: ptr(conf.Prefetch),
msgInFlight: ptr(int64(0)),
Expand Down Expand Up @@ -195,19 +199,21 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.

// initialize job Driver
jb := &Driver{
tracer: tracer,
prop: prop,
cond: sync.Cond{L: &sync.Mutex{}},
pq: pq,
log: log,
messageGroupID: pipe.String(messageGroupID, ""),
attributes: attr,
tags: tg,
skipDeclare: pipe.Bool(skipQueueDeclaration, false),
queue: aws.String(pipe.String(queue, "default")),
visibilityTimeout: int32(pipe.Int(visibility, 0)), //nolint:gosec
waitTime: int32(pipe.Int(waitTime, 0)), //nolint:gosec
pauseCh: make(chan struct{}, 1),
tracer: tracer,
prop: prop,
cond: sync.Cond{L: &sync.Mutex{}},
pq: pq,
log: log,
messageGroupID: pipe.String(messageGroupID, ""),
attributes: attr,
tags: tg,
skipDeclare: pipe.Bool(skipQueueDeclaration, false),
queue: aws.String(pipe.String(queue, "default")),
visibilityTimeout: int32(pipe.Int(visibility, 0)), //nolint:gosec
errorVisibilityTimeout: int32(pipe.Int(errorVisibilityTimeout, 0)), //nolint:gosec
retainFailedJobs: pipe.Bool(retainFailedJobs, false),
waitTime: int32(pipe.Int(waitTime, 0)), //nolint:gosec
pauseCh: make(chan struct{}, 1),
// new in 2.12.1
msgInFlightLimit: ptr(int32(pipe.Int(pref, 10))), //nolint:gosec
msgInFlight: ptr(int64(0)),
Expand Down
110 changes: 60 additions & 50 deletions sqsjobs/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/roadrunner-server/api/v4/plugins/v4/jobs"
"github.com/roadrunner-server/errors"
"go.uber.org/zap"

stderr "errors"
)

const (
Expand All @@ -25,6 +27,7 @@ const (
BinaryType string = "Binary"
ApproximateReceiveCount string = "ApproximateReceiveCount"
fifoSuffix string = ".fifo"
pipelineStoppedError string = "Failed to ACK/NACK or requeue the job. The pipeline is probably stopped."
)

var _ jobs.Job = (*Item)(nil)
Expand Down Expand Up @@ -58,6 +61,11 @@ type Options struct {
AutoAck bool `json:"auto_ack"`
// SQS Queue name
Queue string `json:"queue,omitempty"`
// If RetainFailedJobs is true, failed jobs will have their visibility timeout set to this value instead of the
// default VisibilityTimeout.
ErrorVisibilityTimeout int32 `json:"error_visibility_timeout,omitempty"`
// Whether to retain failed jobs on the queue. If true, jobs will not be deleted and re-queued on NACK.
RetainFailedJobs bool `json:"retain_failed_jobs,omitempty"`

// Private ================
cond *sync.Cond
Expand Down Expand Up @@ -126,7 +134,7 @@ func (i *Item) Context() ([]byte, error) {

func (i *Item) Ack() error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
return errors.Str(pipelineStoppedError)
}
defer func() {
i.Options.cond.Signal()
Expand All @@ -148,9 +156,9 @@ func (i *Item) Ack() error {
return nil
}

func (i *Item) Nack() error {
func (i *Item) commonNack(requeue bool, delay int) error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
return errors.Str(pipelineStoppedError)
}
defer func() {
i.Options.cond.Signal()
Comment on lines 158 to 164
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of defer here, we are now also calling this if requeue is true. This did not happen before. I am not sure if this was a bug/oversight before, or if it is important that requeue does not trigger this code:

defer func() {
		i.Options.cond.Signal()
		atomic.AddInt64(i.Options.msgInFlight, ^int64(0))
	}()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it should be called in all cases. This tells our internal counter to decrement the number of active messages in flight (aka consumed from listener and pushed to the priority queue). Since we're done with that message anyway (delete after pushing it back to SQS), it is OK to decrement that number.

Copy link
Contributor Author

@nickdnk nickdnk Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so it was a bug that it was not called before, on these lines? Or is it a bug that we now always call it even when requeuing?

func (i *Item) NackWithOptions(requeue bool, delay int) error {
	if atomic.LoadUint64(i.Options.stopped) == 1 {
		return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
	}

	// message already deleted
	if i.Options.AutoAck {
		i.Options.cond.Signal()
		atomic.AddInt64(i.Options.msgInFlight, ^int64(0))
		return nil
	}

	if requeue {
                // it was not called in here!
		// requeue message
		err := i.Requeue(nil, delay)
		if err != nil {
			return err
		}
              
		return nil
	}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I guess that was a bug. I incremented the counter in the listener.go, after requeue, we still had +1 message + another one when the same message read in the listener.go.

Expand All @@ -160,37 +168,6 @@ func (i *Item) Nack() error {
if i.Options.AutoAck {
return nil
}

// requeue message
err := i.Options.requeueFn(context.Background(), i)
if err != nil {
return err
}

_, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
QueueUrl: i.Options.queue,
ReceiptHandle: i.Options.receiptHandler,
})

if err != nil {
return err
}

return nil
}

func (i *Item) NackWithOptions(requeue bool, delay int) error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
}

// message already deleted
if i.Options.AutoAck {
i.Options.cond.Signal()
atomic.AddInt64(i.Options.msgInFlight, ^int64(0))
return nil
}
nickdnk marked this conversation as resolved.
Show resolved Hide resolved

if requeue {
// requeue message
err := i.Requeue(nil, delay)
Expand All @@ -200,26 +177,57 @@ func (i *Item) NackWithOptions(requeue bool, delay int) error {

return nil
}
switch {
case !i.Options.RetainFailedJobs:
// requeue as new message
err := i.Options.requeueFn(context.Background(), i)
if err != nil {
return err
}
// Delete original message
_, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
QueueUrl: i.Options.queue,
ReceiptHandle: i.Options.receiptHandler,
})

_, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
QueueUrl: i.Options.queue,
ReceiptHandle: i.Options.receiptHandler,
})
if err != nil {
return err
}
case i.Options.ErrorVisibilityTimeout > 0:
// If error visibility is defined change the visibility timeout of the job that failed
_, err := i.Options.client.ChangeMessageVisibility(context.Background(), &sqs.ChangeMessageVisibilityInput{
QueueUrl: i.Options.queue,
ReceiptHandle: i.Options.receiptHandler,
VisibilityTimeout: i.Options.ErrorVisibilityTimeout,
})

if err != nil {
i.Options.cond.Signal()
atomic.AddInt64(i.Options.msgInFlight, ^int64(0))
return err
if err != nil {
var notInFlight *types.MessageNotInflight
// We ignore this error. If the message is not in flight, we cannot change the visibility. This may happen
// if processing takes longer than the timeout for the message, and no other works pick it up. Should be
// very rare though.
if !stderr.As(err, &notInFlight) {
return err
}
}
default:
// dont do anything; wait for VisibilityTimeout to expire.
rustatian marked this conversation as resolved.
Show resolved Hide resolved
}

i.Options.cond.Signal()
atomic.AddInt64(i.Options.msgInFlight, ^int64(0))
return nil
}

func (i *Item) Nack() error {
return i.commonNack(false, 0)
}

func (i *Item) NackWithOptions(requeue bool, delay int) error {
return i.commonNack(requeue, delay)
}

func (i *Item) Requeue(headers map[string][]string, delay int) error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
return errors.Str(pipelineStoppedError)
}

defer func() {
Expand Down Expand Up @@ -368,11 +376,13 @@ func (c *Driver) unpack(msg *types.Message) *Item {
Payload: []byte(getordefault(msg.Body)),
headers: h,
Options: &Options{
AutoAck: autoAck,
Delay: dl,
Priority: int64(priority),
Pipeline: (*c.pipeline.Load()).Name(),
Queue: getordefault(c.queue),
AutoAck: autoAck,
Delay: dl,
Priority: int64(priority),
Pipeline: (*c.pipeline.Load()).Name(),
Queue: getordefault(c.queue),
ErrorVisibilityTimeout: c.errorVisibilityTimeout,
RetainFailedJobs: c.retainFailedJobs,

// private
approxReceiveCount: recCount,
Expand Down
2 changes: 1 addition & 1 deletion tests/configs/.rr-sqs-attr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
test-1:
driver: sqs
config:
prefetch: 1000
prefetch: 10
visibility_timeout: 0
wait_time_seconds: 0
message_group_id: 'foo'
Expand Down
35 changes: 35 additions & 0 deletions tests/configs/.rr-sqs-error-visibility.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
version: '3'

rpc:
listen: tcp://127.0.0.1:6080

server:
command: "php php_test_files/jobs/jobs_nack.php"
relay: "pipes"
relay_timeout: "20s"

sqs: { }

logs:
level: debug
encoding: console
mode: development

jobs:
num_pollers: 1
pipeline_size: 100000
pool:
num_workers: 1
allocate_timeout: 60s
destroy_timeout: 60s
consume: [ "test-err-visibility" ]
pipelines:
test-err-visibility:
driver: sqs
config:
prefetch: 1
visibility_timeout: 900 # maximum for sqs
wait_time_seconds: 0
queue: default-error-timeout
retain_failed_jobs: true
error_visibility_timeout: 120 # takes ~60s to get consistent metadata from queue for test, so we need a long timeout
Loading
Loading