Skip to content

Commit

Permalink
Check tag format + return metadata + additional doc strings
Browse files Browse the repository at this point in the history
Mainly, bring in tag format checks which verify they're shorter than 255
characters and don't contain any special characters (especially commas),
like Go and Python do already.

Start returning metadata in jobs, although notably, it's not possible to
insert with it yet.

A few additional docstrings brought over from my project to document
River Python.
  • Loading branch information
brandur committed Jul 6, 2024
1 parent 7e36d7b commit 7231c0b
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 8 deletions.
2 changes: 1 addition & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ insert_res.unique_skipped_as_duplicated

### Custom advisory lock prefix

Unique job insertion takes a Postgres advisory lock to make sure that it's uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:
Unique job insertion takes a Postgres advisory lock to make sure that its uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:

```ruby
client = River::Client.new(mock_driver, advisory_lock_prefix: 123456)
Expand Down
1 change: 1 addition & 0 deletions driver/riverqueue-activerecord/lib/driver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def transaction(&)
finalized_at: river_job.finalized_at,
kind: river_job.kind,
max_attempts: river_job.max_attempts,
metadata: river_job.metadata,
priority: river_job.priority,
queue: river_job.queue,
scheduled_at: river_job.scheduled_at,
Expand Down
1 change: 1 addition & 0 deletions driver/riverqueue-sequel/lib/driver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def transaction(&)
finalized_at: river_job.finalized_at,
kind: river_job.kind,
max_attempts: river_job.max_attempts,
metadata: river_job.metadata,
priority: river_job.priority,
queue: river_job.queue,
scheduled_at: river_job.scheduled_at,
Expand Down
16 changes: 15 additions & 1 deletion lib/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@
require "time"

module River
# Default number of maximum attempts for a job.
MAX_ATTEMPTS_DEFAULT = 25

# Default priority for a job.
PRIORITY_DEFAULT = 1

# Default queue for a job.
QUEUE_DEFAULT = "default"

# Provides a client for River that inserts jobs. Unlike the Go version of the
Expand Down Expand Up @@ -241,7 +246,7 @@ def insert_many(args)
queue: insert_opts.queue || args_insert_opts.queue || QUEUE_DEFAULT,
scheduled_at: scheduled_at&.utc, # database defaults to now
state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE,
tags: insert_opts.tags || args_insert_opts.tags
tags: validate_tags(insert_opts.tags || args_insert_opts.tags)
),
unique_opts
]
Expand All @@ -260,6 +265,15 @@ def insert_many(args)
private def uint64_to_int64(int)
[int].pack("Q").unpack1("q") #: Integer # rubocop:disable Layout/LeadingCommentSpace
end

TAG_RE = /\A[\w][\w\-]+[\w]\z/

private def validate_tags(tags)
tags&.each do |tag|
raise ArgumentError, "tags should be 255 characters or less" if tag.length > 255
raise ArgumentError, "tag should match regex #{TAG_RE.inspect}" unless TAG_RE.match(tag)
end
end
end

# A single job to insert that's part of an #insert_many batch insert. Unlike
Expand Down
13 changes: 9 additions & 4 deletions lib/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ class JobRow
# The set of worker IDs that have worked this job. A worker ID differs
# between different programs, but is shared by all executors within any
# given one. (i.e. Different Go processes have different IDs, but IDs are
# shared within any given process.) A process generates a new ULID (an
# ordered UUID) worker ID when it starts up.
# shared within any given process.) A process generates a new ID based on
# host and current time when it starts up.
attr_accessor :attempted_by

# When the job record was created.
attr_accessor :created_at

# A set of errors that occurred when the job was worked, one for each
# attempt. Ordered from earliest error to the latest error.
# attempt. Ordered from earliest error to the latest error.
attr_accessor :errors

# The time at which the job was "finalized", meaning it was either completed
Expand All @@ -79,6 +79,9 @@ class JobRow
# for the last time and will no longer be worked.
attr_accessor :max_attempts

# Arbitrary metadata associated with the job.
attr_accessor :metadata

# The priority of the job, with 1 being the highest priority and 4 being the
# lowest. When fetching available jobs to work, the highest priority jobs
# will always be fetched before any lower priority jobs are fetched. Note
Expand Down Expand Up @@ -112,6 +115,7 @@ def initialize(
created_at:,
kind:,
max_attempts:,
metadata:,
priority:,
queue:,
scheduled_at:,
Expand All @@ -134,6 +138,7 @@ def initialize(
self.finalized_at = finalized_at
self.kind = kind
self.max_attempts = max_attempts
self.metadata = metadata
self.priority = priority
self.queue = queue
self.scheduled_at = scheduled_at
Expand All @@ -157,7 +162,7 @@ class AttemptError
attr_accessor :error

# Contains a stack trace from a job that panicked. The trace is produced by
# invoking `debug.Trace()`.
# invoking `debug.Trace()` in Go.
attr_accessor :trace

def initialize(
Expand Down
6 changes: 5 additions & 1 deletion sig/client.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ module River
def insert_many: (Array[jobArgs | InsertManyParams]) -> Integer

private def check_unique_job: (Driver::JobInsertParams, UniqueOpts?) { () -> InsertResult } -> InsertResult
private def uint64_to_int64: (Integer) -> Integer
private def make_insert_params: (jobArgs, InsertOpts, ?is_insert_many: bool) -> [Driver::JobInsertParams, UniqueOpts?]
private def truncate_time: (Time, Integer) -> Time
private def uint64_to_int64: (Integer) -> Integer

TAG_RE: Regexp

private def validate_tags: (Array[String]?) -> Array[String]?
end

class InsertManyParams
Expand Down
3 changes: 2 additions & 1 deletion sig/job.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ module River
attr_accessor finalized_at: Time?
attr_accessor kind: String
attr_accessor max_attempts: Integer
attr_accessor metadata: Hash[String, untyped]
attr_accessor priority: Integer
attr_accessor queue: String
attr_accessor scheduled_at: Time
attr_accessor state: jobStateAll
attr_accessor tags: Array[String]?

def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?) -> void
def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, metadata: Hash[String, untyped], priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?) -> void
end

class AttemptError
Expand Down
17 changes: 17 additions & 0 deletions spec/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def transaction(&)
finalized_at: nil,
kind: insert_params.kind,
max_attempts: insert_params.max_attempts,
metadata: nil,
priority: insert_params.priority,
queue: insert_params.queue,
scheduled_at: insert_params.scheduled_at || Time.now, # normally defaults from DB
Expand Down Expand Up @@ -194,6 +195,22 @@ def to_json = nil
end.to raise_error(RuntimeError, "args should return non-nil from `#to_json`")
end

it "raises error if tags are too long" do
expect do
client.insert(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new(
tags: ["a" * 256]
))
end.to raise_error(ArgumentError, "tags should be 255 characters or less")
end

it "raises error if tags are misformatted" do
expect do
client.insert(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new(
tags: ["no,commas,allowed"]
))
end.to raise_error(ArgumentError, 'tag should match regex /\A[\w][\w\-]+[\w]\z/')
end

def check_bigint_bounds(int)
raise "lock key shouldn't be larger than Postgres bigint max (9223372036854775807); was: #{int}" if int > 9223372036854775807
raise "lock key shouldn't be smaller than Postgres bigint min (-9223372036854775808); was: #{int}" if int < -9223372036854775808
Expand Down

0 comments on commit 7231c0b

Please sign in to comment.