Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add support for ErrorVisibilityTimeout and job retention #571

Open
wants to merge 12 commits into
base: master
Choose a base branch
from

Conversation

nickdnk
Copy link
Contributor

@nickdnk nickdnk commented Oct 22, 2024

Reason for This PR

Currently it is not possible to nack a job without also deleting and requeuing it. This is not great if you're using SQS and the automatic deadletter queue. This option looks at the "Received Count" for a message and automatically moves it to the deadletter SQS queue once that limit is reached. If RoadRunner deletes and recreates the same job when it fails, its receive count will always be 1 and it will never be moved to the deadletter queue by SQS. You would have to manually implement this logic.

Additionally, this lets you set a very long VisibilityTimeout (like 900s) and still have a much shorter timeout if the job fails, so it can be retried without having to wait the default visibility timeout. If you set ErrorVisibilityTimeout in the config, the timeout of the message will be changed to the value of VisibilityTimeout.

This behavior is similar to how Elastic Beanstalk handles the worker environment for PHP. See https://docs.aws.amazon.com/elasticbeanstalk/latest/dg/using-features-managing-env-tiers.html, specifically the Error visibility timeout section.

Description of Changes

  1. Added configuration options error_visibility_timeout (default 0) and retain_jobs (default false) to the SQS driver configuration. If retain_jobs is false, error_visibility_timeout does nothing.
  2. Removed unused PHP test files and cleaned up the imports on those that are still being used.
  3. Locked the opentelemetry docker image to a fixed version, as the latest one no longer works with the tests (see [announcement] logging exporter has been replaced with debug exporter open-telemetry/opentelemetry-collector#11337)
  4. Refactored some of the Nack code to avoid code duplication. Please review it.
  5. Adjusted some tests.

License Acceptance

By submitting this pull request, I confirm that my contribution is made under
the terms of the MIT license.

PR Checklist

[Author TODO: Meet these criteria.]
[Reviewer TODO: Verify that these criteria are met. Request changes if not]

  • All commits in this PR are signed (git commit -s).
  • The reason for this PR is clearly provided (issue no. or explanation).
  • The description of changes is clear and encompassing.
  • Any required documentation changes (code and docs) are included in this PR.
  • Any user-facing changes are mentioned in CHANGELOG.md. (there is no changelog?)
  • All added/changed functionality is tested.

Summary by CodeRabbit

  • New Features

    • Introduced new configuration options for job visibility and retention in SQS, allowing for better management of failed jobs.
    • Added a new test for error handling related to job visibility timeouts.
  • Bug Fixes

    • Enhanced assertions in existing tests for clarity and accuracy.
  • Documentation

    • Updated configuration files to reflect new prefetch settings across multiple job pipelines.
  • Chores

    • Removed outdated or unnecessary PHP files related to job processing.

@nickdnk
Copy link
Contributor Author

nickdnk commented Oct 22, 2024

Looks like the SQS tests still fail because of missing access to environment secrets. At least it's the same error as I encountered previously and it runs fine locally with my own test keys.

@nickdnk nickdnk force-pushed the master branch 2 times, most recently from bf53022 to 8e30a83 Compare October 22, 2024 23:10
Copy link

codecov bot commented Oct 22, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 31.26%. Comparing base (ba071d2) to head (ca95599).
Report is 5 commits behind head on master.

Additional details and impacted files
@@           Coverage Diff           @@
##           master     #571   +/-   ##
=======================================
  Coverage   31.26%   31.26%           
=======================================
  Files          14       14           
  Lines        1724     1724           
=======================================
  Hits          539      539           
  Misses       1142     1142           
  Partials       43       43           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@nickdnk nickdnk added enhancement New feature or request go Pull requests that update Go code labels Oct 22, 2024
@nickdnk nickdnk self-assigned this Oct 22, 2024
sqsjobs/item.go Outdated Show resolved Hide resolved
@nickdnk nickdnk marked this pull request as ready for review October 23, 2024 00:06
Copy link

coderabbitai bot commented Oct 23, 2024

Walkthrough

The pull request introduces enhancements to the SQS job handling configurations across multiple files. Key additions include new constants and fields in sqsjobs/config.go to manage job visibility and retention policies. The Driver struct is updated to incorporate these fields, and the Options struct in sqsjobs/item.go is modified to include error visibility timeout and retention settings. Several configuration files are updated to adjust the prefetch parameter, and new tests are added to validate the changes. Additionally, some PHP files are deleted or modified, primarily focusing on error handling adjustments.

Changes

File Path Change Summary
sqsjobs/config.go Added constants: errorVisibilityTimeout, retainFailedJobs, maxVisibilityTimeout. Added fields: ErrorVisibilityTimeout, RetainFailedJobs in Config struct. Updated InitDefault method for boundary checks.
sqsjobs/driver.go Added fields: errorVisibilityTimeout, retainFailedJobs in Driver struct. Updated FromConfig and FromPipeline methods to initialize new fields.
sqsjobs/item.go Updated Options struct with new fields: ErrorVisibilityTimeout, RetainFailedJobs. Replaced Nack method with commonNack. Updated Ack and NackWithOptions methods.
tests/configs/.rr-sqs-attr.yaml Updated prefetch value from 1000 to 10 for test-1 pipeline.
tests/configs/.rr-sqs-error-visibility.yaml New configuration file created with settings for error visibility and job retention.
tests/configs/.rr-sqs-init-br.yaml Updated prefetch values from 1000 to 10 for test-1 and test-2 pipelines.
tests/configs/.rr-sqs-init-br_fifo.yaml Updated prefetch values from 1000 to 10 for test-1 and test-2 pipelines.
tests/configs/.rr-sqs-init.yaml Updated prefetch values from 1000 to 10 for test-1 and test-2 pipelines.
tests/configs/.rr-sqs-init_fifo.yaml Updated prefetch values from 1000 to 10 for test-1 and test-2 pipelines.
tests/configs/.rr-sqs-otel.yaml Updated prefetch value from 1000 to 10 for test-1 pipeline.
tests/configs/.rr-sqs-pq.yaml Updated prefetch values from 1000 to 10 for test-1-pq and test-2-pq pipelines.
tests/configs/.rr-sqs-raw.yaml Updated prefetch value from 1000 to 10 for test-raw pipeline.
tests/jobs_sqs_durability_test.go Updated assertions in TestDurabilitySQS for clarity.
tests/jobs_sqs_test.go Added new test function TestSQSErrorVisibilityTimeout to validate visibility timeout behavior.
tests/php_test_files/jobs/jobs_bad_resp.php Removed commented variable declaration and updated error handling in the catch clause.
tests/php_test_files/jobs/jobs_create_memory.php Deleted file.
tests/php_test_files/jobs/jobs_err.php Simplified error handling by removing try-catch block in task processing loop.
tests/php_test_files/jobs/jobs_nack.php New file created for job consumer with failure handling.
tests/php_test_files/jobs/jobs_ok.php Removed error handling in task acknowledgment.
tests/php_test_files/jobs/jobs_ok_pq.php Removed error handling in task acknowledgment.
tests/php_test_files/jobs/jobs_ok_queue_name_exist.php Deleted file.
tests/php_test_files/jobs/jobs_ok_sleep1.php Removed error handling in task acknowledgment.
tests/php_test_files/jobs/jobs_ok_slow.php Deleted file.
tests/php_test_files/jobs/jobs_ok_slow_rand.php Deleted file.
tests/php_test_files/jobs/jobs_send.php Deleted file.
tests/env/otel-collector-config.yml Updated exporter from logging to debug in configuration.

Possibly related PRs

Poem

🐇 In the land of SQS, where jobs do play,
New settings emerge, brightening the day.
With visibility tweaks and failures retained,
Our bunnies hop high, no longer constrained!
So here's to the changes, both big and small,
In the garden of code, we flourish for all! 🌼


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Outside diff range and nitpick comments (17)
tests/php_test_files/jobs/jobs_nack.php (2)

1-4: Consider adding PHP strict types declaration.

For better type safety in test files, consider adding strict types declaration.

 <?php
+declare(strict_types=1);

1-10: Consider adding test cases for different failure scenarios.

While this test file covers the basic nack functionality, consider adding additional test cases to verify:

  1. Multiple consecutive failures
  2. Different error messages
  3. Behavior with the new error visibility timeout
  4. Behavior with job retention enabled/disabled

Would you like me to help create additional test cases to cover these scenarios?

tests/env/docker-compose-otel.yaml (1)

3-3: LGTM! Good practice to lock the OpenTelemetry collector version.

Locking to a specific version (0.110.0) instead of using the default/latest tag is a good practice for ensuring consistent behavior across environments and preventing compatibility issues.

Consider adding a comment above the image declaration to document why this specific version was chosen and any known compatibility constraints. This will help future maintainers understand the version choice rationale.

  collector:
+   # Locked to 0.110.0 due to compatibility issues with newer versions
    image: otel/opentelemetry-collector-contrib:0.110.0
tests/configs/.rr-sqs-error-visibility.yaml (3)

7-7: Consider using an absolute path for the command.

Using a relative path for the PHP test file could make the configuration brittle if executed from different working directories.

Consider using an absolute path or an environment variable:

-  command: "php php_test_files/jobs/jobs_nack.php"
+  command: "php ${APP_ROOT}/php_test_files/jobs/jobs_nack.php"

19-20: Consider reducing pipeline size for tests.

A pipeline size of 100,000 seems excessive for a test configuration. Consider reducing this to a more reasonable value for testing purposes.

-  pipeline_size: 100000
+  pipeline_size: 100

34-35: Document error visibility timeout behavior.

The comment about metadata consistency timing should be expanded to explain:

  1. Why it takes ~60s to get consistent metadata
  2. Why 90s was chosen as the timeout value
  3. What metadata is being referenced

This information would be valuable for future maintenance.

🧰 Tools
🪛 yamllint

[error] 35-35: no new line character at the end of file

(new-line-at-end-of-file)

tests/configs/.rr-sqs-raw.yaml (1)

Based on the gathered information, I can now provide a final response:

Test configuration needs to be updated to cover new features

The test configuration in .rr-sqs-raw.yaml should be updated because:

  • There's a test case TestSQSErrorVisibilityTimeout in tests/jobs_sqs_test.go but no corresponding configuration to test this feature
  • The current configuration has visibility_timeout: 0 at the pipeline level but doesn't include the new error_visibility_timeout setting
  • While it has message retention settings under attributes, it doesn't include the new retain_jobs feature

Suggested updates:

  • Add error_visibility_timeout setting to test its interaction with the existing visibility_timeout
  • Add retain_jobs configuration to ensure proper testing of the job retention feature
  • Consider adding a new pipeline configuration to test different combinations of these settings
🔗 Analysis chain

Line range hint 37-41: Verify if test configuration needs updates for new features.

Given that this PR adds support for ErrorVisibilityTimeout and job retention, consider if this test configuration should be updated to include test cases for these new features. The current visibility_timeout: 0 setting might need to be reviewed in context of the new ErrorVisibilityTimeout functionality.

Let's check if other test configurations have been updated with the new features:


Let me try a different approach to verify this by checking the test configurations more thoroughly and also looking for related test cases.

This will help us understand:

  1. What test configurations exist
  2. Their current content
  3. If there are any test cases covering these features

Let me check one more thing to see if there are any existing test configurations that might be relevant to these features.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if any test configs include the new ErrorVisibilityTimeout or job retention settings
# Expected: Find configurations that test the new features

echo "Checking test configurations for new feature settings..."
rg -l 'error_visibility_timeout|retain_jobs' 'tests/configs/'

Length of output: 184


Script:

#!/bin/bash
# Let's first see all available test configurations
echo "Available test configurations:"
fd . tests/configs/ -e yaml -e yml

# Then let's examine the content of these files
echo -e "\nChecking configuration content:"
for f in $(fd . tests/configs/ -e yaml -e yml); do
  echo -e "\n=== $f ==="
  cat "$f"
done

# Let's also check for any test files that might be testing these features
echo -e "\nChecking test files for related tests:"
rg -i "visibility.*timeout|retain.*job" tests/

Length of output: 737


Script:

#!/bin/bash
# Let's specifically look at the content of .rr-sqs-raw.yaml and any similar SQS configs
echo "=== Content of .rr-sqs-raw.yaml ==="
cat tests/configs/.rr-sqs-raw.yaml

echo -e "\n=== Looking for other SQS-related test configs ==="
fd sqs tests/configs/ -e yaml -e yml --exec cat {}

Length of output: 1231

tests/configs/.rr-sqs-attr.yaml (1)

Line range hint 35-44: Consider adding new configuration options for comprehensive testing.

Since this PR introduces error_visibility_timeout and retain_jobs features, consider adding these configurations to the test file to ensure complete coverage of the new functionality.

Add the new options to the config:

   config:
     prefetch: 10
     visibility_timeout: 0
     wait_time_seconds: 0
     message_group_id: 'foo'
     queue: default.fifo
+    error_visibility_timeout: 0
+    retain_jobs: false
     attributes:
       FifoQueue: 'true'
       MaximumMessageSize: 262144
tests/configs/.rr-sqs-otel.yaml (1)

Line range hint 43-54: Consider adding new configuration options for error handling.

Based on the PR objectives, this configuration file should include the new error_visibility_timeout and retain_jobs options to support the enhanced job failure handling features.

Consider adding these options to the pipeline configuration:

   config:
     prefetch: 10
     visibility_timeout: 0
     wait_time_seconds: 0
     queue: default
+    error_visibility_timeout: 0
+    retain_jobs: false
     attributes:
       DelaySeconds: 0
tests/configs/.rr-sqs-pq.yaml (1)

33-33: Consider documenting the rationale for reduced prefetch.

While the reduced prefetch value is appropriate for testing, it would be helpful to add a comment explaining why 10 was chosen as the optimal value for test scenarios.

Add a comment above each prefetch configuration:

    test-1-pq:
      driver: sqs
      config:
+        # Reduced prefetch for better test isolation and memory efficiency
        prefetch: 10

    test-2-pq:
      driver: sqs
      config:
+        # Reduced prefetch for better test isolation and memory efficiency
        prefetch: 10

Also applies to: 49-49

tests/configs/.rr-sqs-init-br.yaml (1)

Line range hint 34-57: Consider adding new configuration parameters for complete feature testing.

Since this PR introduces error_visibility_timeout and retain_jobs features, consider adding these configurations to both pipelines to ensure comprehensive testing of the new functionality.

Example addition for test-1 pipeline:

     config:
       prefetch: 10
       visibility_timeout: 0
       wait_time_seconds: 0
       queue: default
+      error_visibility_timeout: 30
+      retain_jobs: true
       attributes:

And for test-2 pipeline:

     config:
       prefetch: 10
       queue: default-2
+      error_visibility_timeout: 0
+      retain_jobs: false
       attributes:
tests/configs/.rr-sqs-init-br_fifo.yaml (1)

52-52: Consider documenting prefetch value standardization.

The consistent reduction of prefetch to 10 across both pipelines suggests a deliberate standardization. Consider adding a comment explaining the rationale for this specific value, especially since it's used across multiple test configurations.

Add a comment above the prefetch configuration:

    test-2:
      driver: sqs
      config:
+        # Standardized prefetch value for consistent test behavior
        prefetch: 10
tests/jobs_sqs_durability_test.go (1)

Based on the code analysis, I can now provide a final response:

Additional durability tests needed for error visibility and job retention features

The current durability test doesn't cover the new ErrorVisibilityTimeout and RetainFailedJobs features under network disruption. These features are particularly sensitive to network issues because they rely on AWS SQS API calls:

  • ChangeMessageVisibility is called when ErrorVisibilityTimeout is set
  • Message deletion behavior changes based on RetainFailedJobs setting
  • Both features affect message lifecycle during error scenarios

Key points to test:

  • Verify ChangeMessageVisibility calls succeed after network recovery
  • Ensure retained jobs remain in queue during network outages
  • Validate visibility timeout changes persist through reconnection
  • Check message deletion behavior with RetainFailedJobs during network issues

The implementation shows these features are critical for message lifecycle management, but their behavior under network disruption isn't verified. The durability test should be extended to cover these scenarios since they directly impact message handling during failure conditions.

🔗 Analysis chain

Line range hint 1-127: Consider testing new features in durability scenarios.

Given that this PR adds ErrorVisibilityTimeout and RetainJobs features, consider whether these should be tested under network disruption scenarios. For example, we might want to verify that jobs are properly retained and visibility timeouts are respected even during reconnection attempts.

Let's check if there are other tests covering these scenarios:

If these features aren't tested under network disruption, consider adding test cases or document why they're not needed in this specific test.


Let's examine the test coverage for these features more thoroughly.


Based on the search results, I can see that ErrorVisibilityTimeout is already tested in a dedicated test function TestSQSErrorVisibilityTimeout, but it's tested in isolation rather than under network disruption conditions. Let me check for the actual configuration files in the repository root.


Let me check the actual implementation of these features to better understand their behavior under network disruption.


Let me check one more thing to understand how these features are handled during message processing.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for tests related to the new features
echo "Searching for ErrorVisibilityTimeout tests..."
rg -l "ErrorVisibilityTimeout" "tests/"

echo "Searching for RetainJobs tests..."
rg -l "RetainJobs|RetainFailedJobs" "tests/"

Length of output: 283


Script:

#!/bin/bash
# Check the content of the SQS test file that mentions ErrorVisibilityTimeout
echo "Content of jobs_sqs_test.go around ErrorVisibilityTimeout:"
rg -B 3 -A 10 "ErrorVisibilityTimeout" "tests/jobs_sqs_test.go"

# Look for any configuration files that might set these parameters
echo -e "\nChecking configuration files:"
fd -e yaml -e yml . configs/ -x grep -l "ErrorVisibilityTimeout\|RetainJobs\|RetainFailedJobs" {}

# Check the durability test configuration
echo -e "\nContent of durability test configuration:"
cat "configs/.rr-sqs-durability-redial.yaml"

Length of output: 2111


Script:

#!/bin/bash
# List all yaml files in the repository to find the correct config path
echo "Finding all YAML files:"
fd -e yaml -e yml

# Check the content of the error visibility timeout test config
echo -e "\nSearching for error visibility timeout configuration:"
rg -l "error_visibility_timeout" 

# Check the content of any file that might contain retain jobs configuration
echo -e "\nSearching for retain jobs configuration:"
rg -l "retain.*jobs"

Length of output: 531


Script:

#!/bin/bash
# Check the implementation in item.go and config.go
echo "Content of sqsjobs/item.go around error visibility timeout and retain jobs:"
rg -B 3 -A 10 "error_visibility_timeout|retain.*jobs" sqsjobs/item.go

echo -e "\nContent of sqsjobs/config.go around these features:"
rg -B 3 -A 10 "error_visibility_timeout|retain.*jobs" sqsjobs/config.go

# Check if there are any reconnection/error handling related to these features
echo -e "\nChecking error handling and reconnection logic:"
rg -B 3 -A 10 "ErrorVisibilityTimeout|RetainJobs" sqsjobs/consumer.go

Length of output: 3199


Script:

#!/bin/bash
# Look for the message processing and error handling logic
echo "Searching for message processing and error handling in sqsjobs:"
rg -l "ChangeMessageVisibility|DeleteMessage" sqsjobs/

# Check the implementation files
echo -e "\nChecking implementation around message visibility and retention:"
rg -B 3 -A 10 "ChangeMessageVisibility|DeleteMessage" sqsjobs/

Length of output: 5219

sqsjobs/config.go (1)

Line range hint 143-187: Initialize new configuration fields.

The InitDefault method should set default values for the new fields to match the documented behavior:

  • ErrorVisibilityTimeout should default to 0
  • RetainFailedJobs should default to false

Add initialization in the InitDefault method:

func (c *Config) InitDefault() {
	if c.Queue == nil {
		c.Queue = aws.String("default")
	}

+	// Initialize new fields with documented defaults
+	if c.ErrorVisibilityTimeout < 0 {
+		c.ErrorVisibilityTimeout = 0
+	}
+
+	// RetainFailedJobs defaults to false, so no initialization needed
+	// as bool zero value is false

	// ... rest of the method
}
sqsjobs/driver.go (2)

61-66: Add documentation for the new fields.

Consider adding documentation comments for the new fields to explain their purpose:

  • errorVisibilityTimeout: Explain how it affects failed job visibility
  • retainFailedJobs: Describe its impact on job retention behavior
 	// connection info
 	queue                  *string
 	messageGroupID         string
 	waitTime               int32
 	visibilityTimeout      int32
+	// errorVisibilityTimeout controls the visibility timeout for failed jobs (in seconds)
 	errorVisibilityTimeout int32
+	// retainFailedJobs when true prevents failed jobs from being deleted and requeued
 	retainFailedJobs       bool

Line range hint 1-600: Consider implementing a configuration validator interface.

The validation logic for configuration parameters is duplicated between FromConfig and FromPipeline. Consider implementing a validator interface to centralize the validation logic and ensure consistency.

Example approach:

type configValidator interface {
    Validate() error
}

type driverConfig struct {
    errorVisibilityTimeout int32
    retainFailedJobs      bool
    // ... other fields
}

func (c *driverConfig) Validate() error {
    if c.errorVisibilityTimeout < 0 {
        return errors.Errorf("error visibility timeout must be non-negative, got: %d", c.errorVisibilityTimeout)
    }
    if c.errorVisibilityTimeout > 43200 {
        return errors.Errorf("error visibility timeout must not exceed 43200 seconds (12 hours), got: %d", c.errorVisibilityTimeout)
    }
    // ... other validations
    return nil
}

This would allow both FromConfig and FromPipeline to use the same validation logic:

cfg := &driverConfig{
    errorVisibilityTimeout: conf.ErrorVisibilityTimeout,
    retainFailedJobs: conf.RetainFailedJobs,
}
if err := cfg.Validate(); err != nil {
    return nil, errors.E(op, err)
}
tests/jobs_sqs_test.go (1)

684-685: Consider extracting timing constants.

The test uses magic numbers for sleep durations. Consider extracting these into named constants to improve maintainability and make the timing requirements more explicit.

+const (
+    // Time to wait for all jobs to NACK
+    nackWaitTime = 10 * time.Second
+    // Minimum time required for queue metadata to update according to AWS docs
+    queueMetadataUpdateTime = 60 * time.Second
+    // Additional buffer time for queue metadata update
+    queueMetadataUpdateBuffer = 10 * time.Second
+)

Also applies to: 693-695

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 01a82a8 and 2930aae.

📒 Files selected for processing (26)
  • sqsjobs/config.go (2 hunks)
  • sqsjobs/driver.go (3 hunks)
  • sqsjobs/item.go (7 hunks)
  • tests/configs/.rr-sqs-attr.yaml (1 hunks)
  • tests/configs/.rr-sqs-error-visibility.yaml (1 hunks)
  • tests/configs/.rr-sqs-init-br.yaml (2 hunks)
  • tests/configs/.rr-sqs-init-br_fifo.yaml (2 hunks)
  • tests/configs/.rr-sqs-init.yaml (2 hunks)
  • tests/configs/.rr-sqs-init_fifo.yaml (2 hunks)
  • tests/configs/.rr-sqs-otel.yaml (1 hunks)
  • tests/configs/.rr-sqs-pq.yaml (2 hunks)
  • tests/configs/.rr-sqs-raw.yaml (1 hunks)
  • tests/env/docker-compose-otel.yaml (1 hunks)
  • tests/jobs_sqs_durability_test.go (1 hunks)
  • tests/jobs_sqs_test.go (3 hunks)
  • tests/php_test_files/jobs/jobs_bad_resp.php (1 hunks)
  • tests/php_test_files/jobs/jobs_create_memory.php (0 hunks)
  • tests/php_test_files/jobs/jobs_err.php (1 hunks)
  • tests/php_test_files/jobs/jobs_nack.php (1 hunks)
  • tests/php_test_files/jobs/jobs_ok.php (1 hunks)
  • tests/php_test_files/jobs/jobs_ok_pq.php (1 hunks)
  • tests/php_test_files/jobs/jobs_ok_queue_name_exist.php (0 hunks)
  • tests/php_test_files/jobs/jobs_ok_sleep1.php (1 hunks)
  • tests/php_test_files/jobs/jobs_ok_slow.php (0 hunks)
  • tests/php_test_files/jobs/jobs_ok_slow_rand.php (0 hunks)
  • tests/php_test_files/jobs/jobs_send.php (0 hunks)
💤 Files with no reviewable changes (5)
  • tests/php_test_files/jobs/jobs_create_memory.php
  • tests/php_test_files/jobs/jobs_ok_queue_name_exist.php
  • tests/php_test_files/jobs/jobs_ok_slow.php
  • tests/php_test_files/jobs/jobs_ok_slow_rand.php
  • tests/php_test_files/jobs/jobs_send.php
🧰 Additional context used
🪛 yamllint
tests/configs/.rr-sqs-error-visibility.yaml

[error] 35-35: no new line character at the end of file

(new-line-at-end-of-file)

🔇 Additional comments (22)
tests/php_test_files/jobs/jobs_ok.php (1)

9-9: Verify the intentional removal of error handling.

While this appears to be a "happy path" test file, removing all error handling around ack() might make debugging more difficult if issues occur. Consider:

  1. Is the removal of the try-catch block intentional as part of the new error handling strategy?
  2. Are acknowledgment errors being tested elsewhere (e.g., in error-specific test files)?

If this is intentional, consider adding a comment explaining why error handling was removed to help future maintainers understand the design decision.

✅ Verification successful

Let me check the error handling patterns in the error-specific test file to make a more informed decision.


Removal of error handling is consistent with the test design

After reviewing both test files, the removal of try-catch around ack() is appropriate because:

  1. jobs_ok.php is a minimal test case specifically for the happy path scenario
  2. jobs_err.php handles error cases through retries and acknowledgment logic based on attempt count
  3. Both files follow the same pattern of direct ack() calls without try-catch blocks
  4. The error handling has been moved to a higher level in the actual implementation, making these test files cleaner and more focused on their specific scenarios
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for error handling in other test files
# Expected: Find error handling patterns in error test files

echo "Checking for error handling in other test files..."
rg -l "try.*catch.*ack" tests/php_test_files/jobs/
echo "Checking for specific error test files..."
fd "jobs_err" tests/php_test_files/jobs/

Length of output: 336


Script:

#!/bin/bash
# Check the content of jobs_err.php to understand error handling patterns
echo "Content of jobs_err.php:"
cat tests/php_test_files/jobs/jobs_err.php

# Also check the content of jobs_ok.php for context
echo -e "\nContent of jobs_ok.php:"
cat tests/php_test_files/jobs/jobs_ok.php

Length of output: 871

tests/php_test_files/jobs/jobs_ok_sleep1.php (1)

9-10: Verify the removal of error handling.

Given that this test file appears to be testing the happy path (as suggested by ok in the filename), the removal of the try-catch block might be intentional. However, since the PR focuses on enhancing error handling capabilities, please verify:

  1. Is this test specifically designed to verify successful job processing without error handling?
  2. Should we maintain error logging for debugging purposes, even in success path tests?

Let's check if there are complementary error handling tests:

Consider adding a comment to document the test's specific purpose and why error handling is intentionally omitted.

✅ Verification successful

Let me gather more information about these test files to understand the error handling patterns and verify if the removal of try-catch is consistent with the test suite design.

This will help us understand:

  1. How error cases are handled in dedicated error test files
  2. Whether the absence of try-catch in success path tests is a consistent pattern
  3. The overall test suite design for job processing

Based on the test files content, I can now generate the final response as I have enough context to verify the concern about error handling removal.

Removal of try-catch block is consistent with test suite design

The test suite follows a clear pattern where:

  • Success path tests (jobs_ok.php, jobs_ok_sleep1.php) deliberately omit try-catch blocks and directly perform operations
  • Error handling tests (jobs_err.php, jobs_nack.php) have dedicated files that specifically test failure scenarios
  • Each test file has a focused purpose: success paths, error paths, or specific behaviors (like delays)

The removal of the try-catch block in jobs_ok_sleep1.php aligns with this pattern and makes the test's intention clearer - it's specifically testing the successful execution path with a sleep delay.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Look for related test files that handle error cases
# Expected: Find test files that specifically test error scenarios

echo "Searching for related job test files:"
fd "jobs.*\.php$" tests/php_test_files/jobs -x echo "{}"

Length of output: 390


Script:

#!/bin/bash
# Check content of error handling test files and the ok test file
echo "=== Content of jobs_err.php ==="
cat tests/php_test_files/jobs/jobs_err.php

echo -e "\n=== Content of jobs_nack.php ==="
cat tests/php_test_files/jobs/jobs_nack.php

echo -e "\n=== Content of jobs_ok.php ==="
cat tests/php_test_files/jobs/jobs_ok.php

echo -e "\n=== Content of jobs_bad_resp.php ==="
cat tests/php_test_files/jobs/jobs_bad_resp.php

Length of output: 1770

tests/php_test_files/jobs/jobs_ok_pq.php (2)

9-9: Verify the sleep duration's purpose.

The 15-second sleep appears to be testing queue visibility timeout behavior. Given that this PR introduces new error visibility timeout features, we should ensure this test duration aligns with those changes.

Let's check for other timeout-related configurations:

#!/bin/bash
# Description: Look for timeout configurations to ensure consistency
# Search for timeout-related values in test and config files

rg -g '!{vendor,node_modules}/*' -A 3 "(timeout|sleep)\s*[=:]\s*[0-9]+"

9-10: ⚠️ Potential issue

Consider maintaining error handling in test files.

The removal of the try-catch block around task acknowledgment makes this test more brittle and harder to debug. While the "ok" suffix suggests this is a happy path test, proper error handling in tests helps with:

  1. Debugging when tests fail
  2. Understanding failure scenarios
  3. Maintaining test stability

Let's check if other test files follow similar patterns:

Consider restoring error handling:

 while ($task = $consumer->waitTask()) {
-    sleep(15);
-    $task->ack();
+    try {
+        sleep(15);
+        $task->ack();
+    } catch (Exception $e) {
+        // Log error for debugging
+        fwrite(STDERR, "Error processing task: " . $e->getMessage() . "\n");
+        throw $e;
+    }
 }
tests/php_test_files/jobs/jobs_nack.php (2)

6-6: LGTM: Consumer initialization is correct.

The consumer is properly instantiated from the Spiral\RoadRunner\Jobs namespace.


8-10: Verify error message propagation to SQS.

The implementation looks correct for testing the nack functionality. However, let's verify that the error message 'Job failed!' is properly propagated to SQS and can be used for debugging in the deadletter queue.

tests/php_test_files/jobs/jobs_bad_resp.php (1)

12-16: LGTM! Clean error handling implementation.

The removal of the leading backslash from Throwable aligns with modern PHP practices, as the use of fully qualified class names is unnecessary when proper use statements are in place. The error handling implementation remains functionally identical.

tests/php_test_files/jobs/jobs_err.php (1)

12-16: Review retry logic implementation for SQS deadletter queue compatibility

The current implementation might prevent proper functioning of SQS's deadletter queue feature. When a job reaches maximum attempts, it's explicitly acknowledged (deleted) instead of being allowed to naturally transition to the deadletter queue through SQS's retry count mechanism.

Consider modifying the logic to:

  1. Remove explicit acknowledgment after max attempts to allow SQS to handle the deadletter queue transition
  2. Utilize the new retain_jobs configuration to preserve failed jobs
  3. Leverage error_visibility_timeout for retry timing instead of hardcoded 5-second delay
-    if ($total_attempts > 3) {
-        $task->ack();
-    } else {
-        $task->withHeader("attempts", $total_attempts)->withDelay(5)->requeue("failed");
-    }
+    // Allow SQS to handle deadletter queue transition
+    $task->withHeader("attempts", $total_attempts)->requeue("failed");

Let's verify the SQS configuration:

tests/configs/.rr-sqs-error-visibility.yaml (2)

13-16: LGTM!

Logging configuration is appropriate for a test environment with debug level and development mode enabled.


31-31: Verify SQS maximum visibility timeout.

The comment states this is the maximum for SQS, but AWS documentation should be referenced to confirm this value.

tests/configs/.rr-sqs-raw.yaml (1)

37-37: LGTM! Reduced prefetch is more suitable for testing.

The reduction of prefetch from 1000 to 10 messages is a good change for test configurations, as it makes test behavior more predictable and easier to debug.

tests/configs/.rr-sqs-attr.yaml (1)

35-35: LGTM! Verify consistency across test configurations.

The reduction in prefetch size from 1000 to 10 is reasonable for testing scenarios and provides better control over message processing.

Let's verify the consistency of this change across other test configurations:

tests/configs/.rr-sqs-otel.yaml (1)

43-43: Verify the impact of reduced prefetch value.

The significant reduction in prefetch value from 1000 to 10 messages could impact processing throughput. While this change might improve error handling granularity and memory usage, it's important to ensure it meets your performance requirements.

Consider documenting the rationale for this reduction and its performance implications in the PR description.

tests/configs/.rr-sqs-pq.yaml (1)

33-33: LGTM! Verify consistency across test configurations.

The reduction of prefetch values from 1000 to 10 is appropriate for testing scenarios and aligns with the PR's focus on job failure handling.

Let's verify the consistency of prefetch values across all test configurations:

Also applies to: 49-49

tests/configs/.rr-sqs-init.yaml (2)

35-35: LGTM! Reduced prefetch value is more suitable for testing.

The reduction of prefetch from 1000 to 10 messages is appropriate for a test configuration, especially when testing error handling and visibility timeout features. A smaller batch size provides better control and predictability in test scenarios.


51-51: LGTM! Consistent prefetch configuration across pipelines.

The prefetch reduction to 10 in the second pipeline maintains consistency with the first pipeline, ensuring uniform behavior across both test queues.

tests/configs/.rr-sqs-init-br.yaml (1)

35-35: LGTM: Prefetch reduction improves test reliability.

The reduction in prefetch value from 1000 to 10 is a good change that will make the tests more reliable and predictable, especially when testing error handling and deadletter queue functionality.

Also applies to: 51-51

tests/configs/.rr-sqs-init_fifo.yaml (1)

34-34: LGTM! Reduced prefetch values are more appropriate for testing.

The reduction in prefetch values from 1000 to 10 is a positive change as it:

  • Prevents overwhelming the test environment
  • Aligns better with SQS quotas and limits
  • Is more suitable for testing error handling scenarios

Let's verify if other test configurations are consistent:

Also applies to: 52-52

✅ Verification successful

Let me try a different approach to verify the prefetch values in the SQS configurations.


Let me try one more time with a simpler approach using cat to directly examine the file in question and related SQS config files.


Let me check one more specific file that seems most relevant to prefetch settings.


Based on the gathered information, I can now provide a final response:

Prefetch value change is appropriate and consistent with test scenarios

The reduction from 1000 to 10 in .rr-sqs-init_fifo.yaml is well-justified because:

  • The codebase has dedicated test configurations for different prefetch scenarios (.rr-sqs-init_fifo-prefetch.yaml uses prefetch: 1)
  • The value of 10 is reasonable for general test cases, while still being large enough to test concurrent processing
  • It aligns with the PR's focus on error handling and job retention features
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check prefetch values across all SQS test configurations
# Expect: All SQS test configs should use the same prefetch value for consistency

echo "Checking prefetch values in SQS test configurations..."
rg -A 1 "driver: sqs" tests/configs/ | rg -A 1 "prefetch:"

Length of output: 177


Script:

#!/bin/bash
# Let's directly look for SQS configuration files and their content
echo "Listing all YAML files in tests/configs directory:"
fd -e yaml -e yml . tests/configs/

echo -e "\nChecking content of SQS-related config files:"
for f in $(fd -e yaml -e yml . tests/configs/); do
  echo -e "\n=== $f ==="
  grep -A 5 "driver:" "$f" || true
done

Length of output: 292


Script:

#!/bin/bash
# First, let's examine the file mentioned in the review
echo "=== Content of .rr-sqs-init_fifo.yaml ==="
cat tests/configs/.rr-sqs-init_fifo.yaml

echo -e "\n=== Looking for other SQS config files ==="
find tests/configs -type f -name "*sqs*.yaml" -o -name "*sqs*.yml"

Length of output: 2071


Script:

#!/bin/bash
echo "=== Content of .rr-sqs-init_fifo-prefetch.yaml ==="
cat tests/configs/.rr-sqs-init_fifo-prefetch.yaml

Length of output: 1359

tests/configs/.rr-sqs-init-br_fifo.yaml (1)

34-34: Verify impact of reduced prefetch value on test-1 pipeline.

The reduction in prefetch from 1000 to 10 messages could significantly affect message processing throughput. While this change aligns with testing error scenarios more reliably, we should verify it doesn't impact other test cases.

Run this script to check for test dependencies on prefetch behavior:

✅ Verification successful

Let me analyze the results and gather more context about the test configurations to make a conclusive verification.

The initial results show that:

  1. There's a dedicated test for prefetch behavior (TestSQSPrefetch)
  2. The default prefetch value is 10 if not specified
  3. Tests verify prefetch limit behavior
  4. The driver respects the prefetch configuration

Let me check the specific test implementations to understand if the reduced prefetch value aligns with the test requirements.


Based on the comprehensive test results and configurations, I can now provide a final response about the prefetch value change.

Reduced prefetch value is intentional and verified by tests

The prefetch value of 10 is actually the default value in the codebase (as seen in sqsjobs/config.go), and there's a dedicated test TestSQSPrefetch that specifically verifies:

  • The prefetch limit behavior
  • Concurrent message processing (10 concurrent pushes)
  • Proper handling of prefetch limits with assertions for "prefetch limit was reached" messages
  • Complete message processing (all 20 messages are processed successfully)

The test configuration .rr-sqs-init-br_fifo.yaml aligns with this validated behavior, and the reduced prefetch value is appropriate for:

  1. Consistent test behavior
  2. Better control over message flow
  3. Proper verification of error handling scenarios
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Search for tests that might depend on prefetch behavior
# Expected: Tests should handle both high and low prefetch values

# Look for test files referencing the test-1 queue
rg -l "default-br-1.fifo" --type php --type go

# Look for prefetch-related test assertions
rg -i "prefetch|batch" --type php --type go

Length of output: 1060


Script:

#!/bin/bash
# Check the test configuration files and test implementations
cat tests/configs/.rr-sqs-init_fifo-prefetch.yaml
cat tests/jobs_sqs_fifo_test.go

Length of output: 12991

tests/jobs_sqs_durability_test.go (1)

126-127: LGTM! Improved assertion messages.

The updated assertions provide clearer error messages and follow Go testing best practices by putting the expected value first.

sqsjobs/config.go (1)

10-19: LGTM! Constants are well-defined.

The new constants follow the existing naming convention and are properly organized with related configuration keys.

sqsjobs/driver.go (1)

202-216: ⚠️ Potential issue

Add validation and verify constant definitions.

  1. Add the same validation for errorVisibilityTimeout as suggested for FromConfig.
  2. Verify that the constants errorVisibilityTimeout and retainFailedJobs are properly defined for pipeline parameters.

Let's verify the constant definitions:

tests/configs/.rr-sqs-error-visibility.yaml Outdated Show resolved Hide resolved
sqsjobs/config.go Outdated Show resolved Hide resolved
sqsjobs/driver.go Show resolved Hide resolved
tests/jobs_sqs_test.go Show resolved Hide resolved
tests/jobs_sqs_test.go Outdated Show resolved Hide resolved
sqsjobs/item.go Outdated Show resolved Hide resolved
sqsjobs/item.go Outdated Show resolved Hide resolved
sqsjobs/item.go Outdated Show resolved Hide resolved
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (2)
sqsjobs/config.go (1)

41-52: Add constraint documentation for ErrorVisibilityTimeout.

While the documentation for the new fields is comprehensive, it should also mention that ErrorVisibilityTimeout must be lower than VisibilityTimeout when both are non-zero. This constraint is important for users to understand when configuring these values.

 	// If defined (> 0) and RetainFailedJobs is true, RR will change the visibility timeout of failed jobs and let them
 	// be received again, instead of deleting and re-queueing them as new jobs. This allows you to use the automatic SQS
 	// dead-letter feature by setting a maximum receive count on your queue. This produces similar behavior to Elastic
 	// Beanstalk's worker environments.
+	// Note: When both ErrorVisibilityTimeout and VisibilityTimeout are non-zero, ErrorVisibilityTimeout must be lower than VisibilityTimeout.
 	// If this is enabled, your driver credentials must have the sqs:ChangeMessageVisibility permission for the queue.
 	ErrorVisibilityTimeout int32 `mapstructure:"error_visibility_timeout"`
sqsjobs/item.go (1)

197-202: Enhance error handling information for MessageNotInflight case.

When ignoring the MessageNotInflight error, consider including additional context in the log message such as the message ID and queue URL to aid in debugging.

-				log.Println("MessageNotInFlight; ignoring ChangeMessageVisibility")
+				log.Printf("MessageNotInFlight for message %s in queue %s; ignoring ChangeMessageVisibility", i.ID(), *i.Options.queue)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 2930aae and 632dbe1.

📒 Files selected for processing (3)
  • sqsjobs/config.go (3 hunks)
  • sqsjobs/item.go (7 hunks)
  • tests/configs/.rr-sqs-error-visibility.yaml (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/configs/.rr-sqs-error-visibility.yaml
🧰 Additional context used
🔇 Additional comments (6)
sqsjobs/config.go (2)

10-20: LGTM! Constants are well-defined and follow AWS SQS constraints.

The new constants are properly organized and follow the existing naming convention. The maxVisibilityTimeout of 43200 seconds (12 hours) correctly aligns with AWS SQS maximum visibility timeout limit.


143-154: ⚠️ Potential issue

Add validation for ErrorVisibilityTimeout vs VisibilityTimeout relationship.

While the code properly validates both timeouts against AWS SQS limits, it's missing validation to ensure ErrorVisibilityTimeout is lower than VisibilityTimeout when both are non-zero. This validation is crucial as having a longer error visibility timeout than the normal visibility timeout could lead to unexpected behavior in job processing.

This issue was previously raised in an earlier review. Please implement the suggested validation to maintain the correct relationship between these timeouts.

sqsjobs/item.go (4)

64-68: LGTM! Well-documented configuration options.

The new fields are well-structured and properly documented, with appropriate types and JSON tags.


Line range hint 209-223: LGTM! Clean refactoring of Nack methods.

The refactoring effectively reduces code duplication while maintaining clear separation of concerns between regular Nack and NackWithOptions.


377-383: LGTM! Proper initialization of new configuration options.

The new fields are correctly initialized from the driver's configuration, maintaining consistency with the existing initialization pattern.


187-204: Verify visibility timeout behavior across queue types.

Let's verify that the visibility timeout changes work correctly for both standard and FIFO queues.

✅ Verification successful

Based on the shell script results, I can now generate the final response as there's sufficient evidence to verify the visibility timeout behavior across queue types.

Visibility timeout changes are properly implemented for both standard and FIFO queues

The codebase shows correct implementation of visibility timeout changes:

  • The visibility timeout logic is queue-type agnostic and works consistently for both standard and FIFO queues
  • Error visibility timeout is properly bounded (0-43200 seconds) in config.go
  • The implementation correctly handles message visibility changes through ChangeMessageVisibility API
  • FIFO queue detection is properly implemented using .fifo suffix checks
  • Tests verify the behavior with both standard queues (jobs_sqs_test.go) and FIFO queues (jobs_sqs_fifo_test.go)
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check queue types where visibility timeout changes are applied
# Test: Search for queue URL patterns and visibility timeout changes

# Look for FIFO queue handling
rg -A 5 "fifoSuffix|\.fifo"

# Look for visibility timeout related configurations and changes
rg -A 5 "VisibilityTimeout|visibility.*timeout"

Length of output: 13107

sqsjobs/item.go Outdated Show resolved Hide resolved
sqsjobs/item.go Outdated Show resolved Hide resolved
sqsjobs/item.go Outdated Show resolved Hide resolved
tests/jobs_sqs_test.go Outdated Show resolved Hide resolved
@rustatian rustatian changed the title Add support for ErrorVisibilityTimeout and job retention feature: add support for ErrorVisibilityTimeout and job retention Oct 23, 2024
@nickdnk
Copy link
Contributor Author

nickdnk commented Oct 23, 2024

Can we make the pipeline only run manually? It's a lot of spam with failed jobs etc. whenever I fix something in review.

sqsjobs/item.go Outdated Show resolved Hide resolved
sqsjobs/item.go Outdated Show resolved Hide resolved
sqsjobs/item.go Outdated Show resolved Hide resolved
sqsjobs/item.go Outdated Show resolved Hide resolved
Comment on lines 158 to 164

func (i *Item) Nack() error {
func (i *Item) commonNack(requeue bool, delay int) error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
return errors.Str(pipelineStoppedError)
}
defer func() {
i.Options.cond.Signal()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of defer here, we are now also calling this if requeue is true. This did not happen before. I am not sure if this was a bug/oversight before, or if it is important that requeue does not trigger this code:

defer func() {
		i.Options.cond.Signal()
		atomic.AddInt64(i.Options.msgInFlight, ^int64(0))
	}()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it should be called in all cases. This tells our internal counter to decrement the number of active messages in flight (aka consumed from listener and pushed to the priority queue). Since we're done with that message anyway (delete after pushing it back to SQS), it is OK to decrement that number.

Copy link
Contributor Author

@nickdnk nickdnk Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so it was a bug that it was not called before, on these lines? Or is it a bug that we now always call it even when requeuing?

func (i *Item) NackWithOptions(requeue bool, delay int) error {
	if atomic.LoadUint64(i.Options.stopped) == 1 {
		return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
	}

	// message already deleted
	if i.Options.AutoAck {
		i.Options.cond.Signal()
		atomic.AddInt64(i.Options.msgInFlight, ^int64(0))
		return nil
	}

	if requeue {
                // it was not called in here!
		// requeue message
		err := i.Requeue(nil, delay)
		if err != nil {
			return err
		}
              
		return nil
	}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I guess that was a bug. I incremented the counter in the listener.go, after requeue, we still had +1 message + another one when the same message read in the listener.go.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Outside diff range and nitpick comments (1)
schema.json (1)

11-16: Consider using versioned schema references.

The schema currently references the master branch of the RoadRunner repository, which could lead to compatibility issues if the referenced schemas change. Consider using specific version tags or commit hashes for more stable references.

Example:

-          "$ref": "https://raw.githubusercontent.com/roadrunner-server/roadrunner/refs/heads/master/schemas/config/3.0.schema.json#/definitions/PipelineProperties/priority"
+          "$ref": "https://raw.githubusercontent.com/roadrunner-server/roadrunner/v2023.3.0/schemas/config/3.0.schema.json#/definitions/PipelineProperties/priority"
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 7d7bd59 and 27e466f.

📒 Files selected for processing (2)
  • schema.json (1 hunks)
  • sqsjobs/item.go (7 hunks)
🧰 Additional context used
🔇 Additional comments (4)
schema.json (3)

1-4: LGTM: Well-structured schema metadata.

The schema metadata is properly defined with appropriate title and description.


30-39: LGTM: Well-defined new configuration properties.

The new error_visibility_timeout and retain_failed_jobs properties are well-documented and properly integrated with clear descriptions of their interdependencies and effects on job handling.


1-191: LGTM: Comprehensive and well-structured schema.

The schema successfully implements the new error_visibility_timeout and retain_failed_jobs features while providing thorough validation rules and documentation for all configuration options. The integration with existing RoadRunner schemas is handled appropriately.

sqsjobs/item.go (1)

379-385: Verify pipeline pointer dereference safety

The code dereferences c.pipeline.Load() without checking if it's nil. While this might be guaranteed by the driver's initialization, it's worth adding a safety check.

✅ Verification successful

Let me gather more context about the pipeline usage and initialization to make a more informed decision.


Let me check how the pipeline is initialized and if there are any safety mechanisms in place.


Let me check one final aspect - the initialization and usage patterns in the driver constructor and related methods.


Based on the gathered evidence, I can now provide a final response:

Pipeline dereference is safe in this context

The code analysis reveals that:

  • The pipeline is stored using atomic.Pointer[jobs.Pipeline] type, which provides thread-safe access
  • The pipeline is properly stored using pipeline.Store() before any operations
  • All usages of c.pipeline.Load() across the codebase follow the same dereferencing pattern
  • The driver's implementation shows consistent pipeline management

The dereferencing of c.pipeline.Load() is a safe operation in this context as it's part of the driver's core functionality and the pipeline is guaranteed to be initialized before use through the driver's lifecycle management.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if there are any nil checks for c.pipeline in the codebase
rg -A 2 "c\.pipeline\.Load\(\)" | grep -B 2 "if.*nil"

Length of output: 55


Script:

#!/bin/bash
# Check how pipeline is used in the codebase
rg -A 5 "c\.pipeline" 

# Also check for any initialization patterns
ast-grep --pattern 'type $_ struct {
  $$$
  pipeline $_ 
  $$$
}'

Length of output: 4039


Script:

#!/bin/bash
# Check pipeline initialization and safety patterns
rg -A 5 "pipeline\.Store\(" 

# Check constructor/initialization of the Driver
ast-grep --pattern 'func $_ Driver($$$) {
  $$$
}'

Length of output: 1051


Script:

#!/bin/bash
# Check for New/Init functions and pipeline usage
rg -A 10 "func New" sqsjobs/driver.go

# Also check if there are any panic conditions related to pipeline
rg -B 2 -A 2 "panic\(" sqsjobs/driver.go

Length of output: 82

schema.json Outdated Show resolved Hide resolved
schema.json Outdated Show resolved Hide resolved
sqsjobs/item.go Show resolved Hide resolved
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (4)
tests/configs/.rr-sqs-error-visibility.yaml (3)

6-9: Consider documenting the relay_timeout value.

The server configuration is correct, but it would be helpful to add a comment explaining why 20s was chosen as the relay timeout value.


18-25: Consider reducing pipeline_size for test environment.

The current pipeline_size of 100,000 seems excessive for a test configuration. Consider reducing it to a smaller value (e.g., 100 or 1000) that's more appropriate for testing purposes while still maintaining test effectiveness.


27-35: LGTM! Configuration aligns with PR objectives.

The SQS pipeline configuration correctly implements the new error visibility timeout feature with appropriate settings. The comment explains the reasoning behind the 120s error visibility timeout.

Consider adding a comment explaining why the maximum visibility timeout (900s) was chosen for better maintainability.

tests/jobs_sqs_test.go (1)

604-715: LGTM! Well-documented test implementation.

The test effectively verifies the error visibility timeout feature with clear stages and comprehensive assertions. The comments provide excellent context about AWS metadata update timing requirements.

Consider extracting the sleep durations into named constants at the package level for better maintainability:

+const (
+    // Time to wait for AWS queue metadata to update (minimum 60s as per AWS docs)
+    queueMetadataUpdateDelay = 110 * time.Second
+    // Initial processing delay
+    initialProcessingDelay = 10 * time.Second
+    // Additional margin for metadata settling
+    metadataSettlingDelay = 90 * time.Second
+)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 386578a and 3b34de8.

📒 Files selected for processing (2)
  • tests/configs/.rr-sqs-error-visibility.yaml (1 hunks)
  • tests/jobs_sqs_test.go (3 hunks)
🧰 Additional context used
📓 Learnings (1)
tests/jobs_sqs_test.go (1)
Learnt from: nickdnk
PR: roadrunner-server/sqs#571
File: tests/jobs_sqs_test.go:705-706
Timestamp: 2024-10-23T01:05:08.661Z
Learning: In the `TestSQSErrorVisibilityTimeout` function in `tests/jobs_sqs_test.go`, when testing AWS SQS queue attributes, the metadata can take time to settle and may report inconsistent values if polled multiple times. Therefore, avoid replacing fixed sleep durations with polling mechanisms in such tests.
🔇 Additional comments (3)
tests/configs/.rr-sqs-error-visibility.yaml (2)

3-4: LGTM! RPC configuration is appropriate for testing.

The RPC configuration correctly uses localhost binding for test environment.


1-35: Skip duplicate comment about missing newline.

The missing newline at end of file issue has already been reported in a previous review.

tests/jobs_sqs_test.go (1)

802-804: Skip comment - issue already identified.

The hardcoded AWS account ID issue was previously identified in the past review comments and the suggestion is still valid.

Also applies to: 813-815

Remove unused PHP test files and cleaned op the remaining ones
Added options error_visibility_timeout and retain_failed_jobs to SQS driver
Locked opentelemetry docker image to a version that works with tests
Created common method for Nack to deduplicate code
Adjusted durability test from GreaterThan to Equal
Fixed wrong order of expected vs. actual param in some tests
Fix yaml newline
Reword some comments
Adjust error message
change if-else to switch
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (2)
tests/configs/.rr-sqs-error-visibility.yaml (2)

18-25: Consider reducing pipeline size for tests.

The pipeline_size of 100,000 seems excessive for a test configuration. Consider reducing it to a more reasonable value (e.g., 100 or 1000) to avoid unnecessary resource allocation during tests.


30-35: Enhance the configuration comment.

The comment about metadata consistency could be more detailed to help future maintainers understand the specific timing constraints.

-        error_visibility_timeout: 120 # takes ~60s to get consistent metadata from queue for test, so we need a long timeout
+        error_visibility_timeout: 120 # Set to 120s because SQS takes ~60s to ensure consistent queue metadata retrieval during tests, requiring a timeout buffer
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 3b34de8 and 1f18fe9.

📒 Files selected for processing (2)
  • tests/configs/.rr-sqs-error-visibility.yaml (1 hunks)
  • tests/jobs_sqs_test.go (3 hunks)
🧰 Additional context used
📓 Learnings (1)
tests/jobs_sqs_test.go (1)
Learnt from: nickdnk
PR: roadrunner-server/sqs#571
File: tests/jobs_sqs_test.go:705-706
Timestamp: 2024-10-23T01:05:08.661Z
Learning: In the `TestSQSErrorVisibilityTimeout` function in `tests/jobs_sqs_test.go`, when testing AWS SQS queue attributes, the metadata can take time to settle and may report inconsistent values if polled multiple times. Therefore, avoid replacing fixed sleep durations with polling mechanisms in such tests.
🔇 Additional comments (4)
tests/configs/.rr-sqs-error-visibility.yaml (2)

3-16: LGTM! Security configuration looks good.

The RPC listener is properly restricted to localhost, and debug logging is appropriate for test configurations.


6-9: Verify the PHP test file path resolution.

The relative path php_test_files/jobs/jobs_nack.php might cause issues if the working directory changes during test execution.

✅ Verification successful

Since the fd command successfully located the PHP test file at ./tests/php_test_files/jobs/jobs_nack.php, let me verify if this path is consistently used across test configurations by searching for similar patterns.


Let me modify the search pattern to be more inclusive and find all command configurations in the test YAML files.


Let me try one more search with a simpler pattern to find any references to the PHP test files in the configs directory.


Let me try one final verification by checking the actual content of the test configuration file and its directory structure.


The relative path in the configuration is correct and safe

The verification shows that:

  1. The PHP test file jobs_nack.php exists at the correct location tests/php_test_files/jobs/
  2. The relative path php_test_files/jobs/jobs_nack.php in the configuration is appropriate because:
    • The test configuration file is in tests/configs/
    • The PHP file is in tests/php_test_files/jobs/
    • When executing from the tests directory, the relative path will resolve correctly
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Verify the PHP test file exists and is accessible
fd --type f "jobs_nack.php" --exec echo "PHP test file found at: {}"

Length of output: 134


Script:

#!/bin/bash
# Search for PHP test file paths in YAML configurations
rg "command:.+php_test_files/jobs" tests/configs/

Length of output: 49


Script:

#!/bin/bash
# Search for command configurations in YAML files
rg "^\s*command:" tests/configs/ -A 1

Length of output: 39


Script:

#!/bin/bash
# Search for any mentions of php_test_files in the configs
rg "php_test_files" tests/configs/

Length of output: 34


Script:

#!/bin/bash
# Check the content of the configuration file and related directory structure
echo "Configuration file content:"
cat tests/configs/.rr-sqs-error-visibility.yaml

echo -e "\nDirectory structure:"
ls -R tests/php_test_files/jobs/

Length of output: 1074

tests/jobs_sqs_test.go (2)

802-804: Previous review comment about hardcoded AWS account ID is still applicable.

The hardcoded AWS account ID in queue URL assertions should be externalized.

Also applies to: 813-815


604-715: LGTM! Well-structured test with clear stages and comprehensive assertions.

The test effectively validates the error visibility timeout behavior:

  1. Pushes 4 jobs that will fail
  2. Verifies they are processed exactly once
  3. Confirms queue state transitions through visibility timeout periods

The sleep durations are justified by AWS SQS's metadata update requirements (documented in the comments).

Let's verify the test configuration:

✅ Verification successful

Let's try to find the correct configuration file and verify its contents.


I found the relevant configuration file and its settings. The test's behavior aligns with the configuration:

Test configuration is properly set up with correct visibility timeout values

Found in tests/configs/.rr-sqs-error-visibility.yaml:

test-err-visibility:
  driver: sqs
  config:
    visibility_timeout: 900 # maximum for sqs
    queue: default-error-timeout
    retain_failed_jobs: true
    error_visibility_timeout: 120 # takes ~60s to get consistent metadata from queue for test

The test's timing assertions match the configuration:

  • Initial 10s wait for processing
  • 110s wait for queue metadata update (documented AWS requirement of "at least 60 seconds")
  • 90s additional wait to exceed the 180s error visibility timeout

The test correctly validates the SQS error visibility timeout behavior with appropriate margins for AWS's metadata consistency requirements.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the error visibility timeout configuration in the test YAML

# Test: Check the visibility timeout settings in the test config
rg -A 5 "error_visibility_timeout|visibility_timeout" "configs/.rr-sqs-error-visibility.yaml"

Length of output: 171


Script:

#!/bin/bash
# First, let's find all YAML files that might contain the SQS configuration
fd -e yaml -e yml

# Then, let's search for SQS-related configuration in all YAML files
rg -A 10 "sqs|visibility|timeout" -t yaml

Length of output: 30022

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (3)
sqsjobs/item.go (3)

20-21: Consider using standard errors package alias

The alias stderr "errors" is unconventional. Consider using the standard package name.

-	stderr "errors"
+	"errors"

Line range hint 159-214: Consider simplifying error handling flow in commonNack

The switch statement could be simplified using early returns for better readability and maintainability.

-	switch {
-	case !i.Options.RetainFailedJobs:
-		// requeue as new message
-		err := i.Options.requeueFn(context.Background(), i)
-		if err != nil {
-			return err
-		}
-		// Delete original message
-		_, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
-			QueueUrl:      i.Options.queue,
-			ReceiptHandle: i.Options.receiptHandler,
-		})
-		if err != nil {
-			return err
-		}
-	case i.Options.ErrorVisibilityTimeout > 0:
-		// If error visibility is defined change the visibility timeout of the job that failed
-		_, err := i.Options.client.ChangeMessageVisibility(context.Background(), &sqs.ChangeMessageVisibilityInput{
-			QueueUrl:          i.Options.queue,
-			ReceiptHandle:     i.Options.receiptHandler,
-			VisibilityTimeout: i.Options.ErrorVisibilityTimeout,
-		})
-		if err != nil {
-			var notInFlight *types.MessageNotInflight
-			if !stderr.As(err, &notInFlight) {
-				return err
-			}
-		}
-	default:
-		// dont do anything; wait for VisibilityTimeout to expire.
-	}
+	if !i.Options.RetainFailedJobs {
+		if err := i.Options.requeueFn(context.Background(), i); err != nil {
+			return err
+		}
+		
+		if _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+			QueueUrl:      i.Options.queue,
+			ReceiptHandle: i.Options.receiptHandler,
+		}); err != nil {
+			return err
+		}
+		
+		return nil
+	}
+	
+	if i.Options.ErrorVisibilityTimeout > 0 {
+		_, err := i.Options.client.ChangeMessageVisibility(context.Background(), &sqs.ChangeMessageVisibilityInput{
+			QueueUrl:          i.Options.queue,
+			ReceiptHandle:     i.Options.receiptHandler,
+			VisibilityTimeout: i.Options.ErrorVisibilityTimeout,
+		})
+		
+		if err != nil {
+			var notInFlight *types.MessageNotInflight
+			if !errors.As(err, &notInFlight) {
+				return err
+			}
+		}
+		
+		return nil
+	}
+	
+	// dont do anything; wait for VisibilityTimeout to expire.
+	return nil

206-209: Consider adding debug logging for MessageNotInflight error

While the comment explains the rare case well, adding debug logging would help with troubleshooting.

 			// if processing takes longer than the timeout for the message, and no other works pick it up. Should be
 			// very rare though.
 			if !stderr.As(err, &notInFlight) {
 				return err
 			}
+			c.log.Debug("Message not in flight, visibility timeout cannot be changed")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 1f18fe9 and ca95599.

📒 Files selected for processing (26)
  • sqsjobs/config.go (3 hunks)
  • sqsjobs/driver.go (3 hunks)
  • sqsjobs/item.go (7 hunks)
  • tests/configs/.rr-sqs-attr.yaml (1 hunks)
  • tests/configs/.rr-sqs-error-visibility.yaml (1 hunks)
  • tests/configs/.rr-sqs-init-br.yaml (2 hunks)
  • tests/configs/.rr-sqs-init-br_fifo.yaml (2 hunks)
  • tests/configs/.rr-sqs-init.yaml (2 hunks)
  • tests/configs/.rr-sqs-init_fifo.yaml (2 hunks)
  • tests/configs/.rr-sqs-otel.yaml (1 hunks)
  • tests/configs/.rr-sqs-pq.yaml (2 hunks)
  • tests/configs/.rr-sqs-raw.yaml (1 hunks)
  • tests/env/otel-collector-config.yml (2 hunks)
  • tests/jobs_sqs_durability_test.go (1 hunks)
  • tests/jobs_sqs_test.go (3 hunks)
  • tests/php_test_files/jobs/jobs_bad_resp.php (1 hunks)
  • tests/php_test_files/jobs/jobs_create_memory.php (0 hunks)
  • tests/php_test_files/jobs/jobs_err.php (1 hunks)
  • tests/php_test_files/jobs/jobs_nack.php (1 hunks)
  • tests/php_test_files/jobs/jobs_ok.php (1 hunks)
  • tests/php_test_files/jobs/jobs_ok_pq.php (1 hunks)
  • tests/php_test_files/jobs/jobs_ok_queue_name_exist.php (0 hunks)
  • tests/php_test_files/jobs/jobs_ok_sleep1.php (1 hunks)
  • tests/php_test_files/jobs/jobs_ok_slow.php (0 hunks)
  • tests/php_test_files/jobs/jobs_ok_slow_rand.php (0 hunks)
  • tests/php_test_files/jobs/jobs_send.php (0 hunks)
💤 Files with no reviewable changes (5)
  • tests/php_test_files/jobs/jobs_create_memory.php
  • tests/php_test_files/jobs/jobs_ok_queue_name_exist.php
  • tests/php_test_files/jobs/jobs_ok_slow.php
  • tests/php_test_files/jobs/jobs_ok_slow_rand.php
  • tests/php_test_files/jobs/jobs_send.php
🚧 Files skipped from review as they are similar to previous changes (19)
  • sqsjobs/config.go
  • sqsjobs/driver.go
  • tests/configs/.rr-sqs-attr.yaml
  • tests/configs/.rr-sqs-error-visibility.yaml
  • tests/configs/.rr-sqs-init-br.yaml
  • tests/configs/.rr-sqs-init-br_fifo.yaml
  • tests/configs/.rr-sqs-init.yaml
  • tests/configs/.rr-sqs-init_fifo.yaml
  • tests/configs/.rr-sqs-otel.yaml
  • tests/configs/.rr-sqs-pq.yaml
  • tests/configs/.rr-sqs-raw.yaml
  • tests/env/otel-collector-config.yml
  • tests/jobs_sqs_durability_test.go
  • tests/php_test_files/jobs/jobs_bad_resp.php
  • tests/php_test_files/jobs/jobs_err.php
  • tests/php_test_files/jobs/jobs_nack.php
  • tests/php_test_files/jobs/jobs_ok.php
  • tests/php_test_files/jobs/jobs_ok_pq.php
  • tests/php_test_files/jobs/jobs_ok_sleep1.php
🧰 Additional context used
📓 Learnings (1)
tests/jobs_sqs_test.go (1)
Learnt from: nickdnk
PR: roadrunner-server/sqs#571
File: tests/jobs_sqs_test.go:705-706
Timestamp: 2024-10-23T01:05:08.661Z
Learning: In the `TestSQSErrorVisibilityTimeout` function in `tests/jobs_sqs_test.go`, when testing AWS SQS queue attributes, the metadata can take time to settle and may report inconsistent values if polled multiple times. Therefore, avoid replacing fixed sleep durations with polling mechanisms in such tests.
🔇 Additional comments (7)
sqsjobs/item.go (3)

64-68: LGTM: Well-documented configuration options

The new configuration options are well-documented and clearly explain their purpose:

  • ErrorVisibilityTimeout for customizing visibility timeout of failed jobs
  • RetainFailedJobs for controlling job retention behavior

220-226: LGTM: Clean method implementations

The Nack and NackWithOptions methods are well-implemented, providing a clean interface to the common functionality.


379-385: LGTM: Proper initialization of new fields

The new fields are properly initialized in the unpack method, inheriting values from the driver configuration.

tests/jobs_sqs_test.go (4)

604-623: LGTM: Test setup is well-structured.

The test environment is properly configured with mock logger and necessary plugins.


676-687: LGTM: Job processing verification is thorough.

The test effectively verifies that:

  1. All 4 jobs are pushed successfully
  2. Jobs are processed exactly once (no retries)
  3. Processing completes within the expected timeframe

696-709: LGTM: Queue state verification is comprehensive.

The test properly validates the queue state:

  1. Before timeout: 0 active, 4 reserved messages
  2. After timeout: 4 active, 0 reserved messages

The sleep durations are necessary for AWS SQS metadata updates as documented.


802-804: ⚠️ Potential issue

Replace hardcoded AWS account ID with environment variable.

The test contains a hardcoded AWS account ID in the queue URL assertions. This makes the tests less portable and could be a security concern.

Apply this diff to use an environment variable:

-assert.Equal(t, "https://sqs.us-east-1.amazonaws.com/569200086642/test-stat-sqs", out.Queue)
+expectedQueue := fmt.Sprintf("https://sqs.us-east-1.amazonaws.com/%s/test-stat-sqs", os.Getenv("AWS_ACCOUNT_ID"))
+assert.Equal(t, expectedQueue, out.Queue)

Also applies to: 813-815

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request go Pull requests that update Go code
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants