From 6898af2c32149ca6bc46e5c1210b4159451b6891 Mon Sep 17 00:00:00 2001 From: Brandur Date: Sun, 19 Nov 2023 16:12:08 -0800 Subject: [PATCH] First release of River Ruby bindings A first push of Ruby bindings for River. Meant to be used in conjunction with a driver like `riverqueue-sequel` [1] to provide an insert-only client for River. See the README for details on usage. Overall, I'm happy at how close I was able to keep the API to the Go version. A lot of syntax in Go just isn't needed due to the more dynamic and implicit nature of Ruby, but the parts that came through are quite close. e.g. We have a job args concept, along with `InsertOpts` that can be added to both jobs and at insert time, just like Go. Purposely not implemented on this first push (I'll follow up with these later on): * Unique jobs. * Batch insert. [1] https://github.com/riverqueue/riverqueue-ruby-sequel --- .github/workflows/ci.yml | 80 +++++++++++++++++++ .gitignore | 1 + Gemfile | 14 ++++ Gemfile.lock | 93 ++++++++++++++++++++++ README.md | 8 -- docs/README.md | 74 ++++++++++++++++++ docs/development.md | 39 +++++++++ lib/client.rb | 59 ++++++++++++++ lib/driver.rb | 48 ++++++++++++ lib/insert_opts.rb | 62 +++++++++++++++ lib/job.rb | 165 +++++++++++++++++++++++++++++++++++++++ lib/riverqueue.rb | 9 +++ riverqueue.gemspec | 16 ++-- spec/client_spec.rb | 161 ++++++++++++++++++++++++++++++++++++++ spec/job_spec.rb | 40 ++++++++++ spec/spec_helper.rb | 12 +++ 16 files changed, 865 insertions(+), 16 deletions(-) create mode 100644 .github/workflows/ci.yml create mode 100644 Gemfile create mode 100644 Gemfile.lock delete mode 100644 README.md create mode 100644 docs/README.md create mode 100644 docs/development.md create mode 100644 lib/client.rb create mode 100644 lib/driver.rb create mode 100644 lib/insert_opts.rb create mode 100644 lib/job.rb create mode 100644 spec/client_spec.rb create mode 100644 spec/job_spec.rb create mode 100644 spec/spec_helper.rb diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..2f05017 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,80 @@ +name: CI + +env: + # Database to connect to that can create other databases with `CREATE DATABASE`. + ADMIN_DATABASE_URL: postgres://postgres:postgres@localhost:5432 + + # Just a common place for steps to put binaries they need and which is added + # to GITHUB_PATH/PATH. + BIN_PATH: /home/runner/bin + + # A suitable URL for a test database. + TEST_DATABASE_URL: postgres://postgres:postgres@127.0.0.1:5432/riverqueue_ruby_test?sslmode=disable + +on: + - push + +jobs: + lint: + runs-on: ubuntu-latest + timeout-minutes: 3 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Ruby + `bundle install` + uses: ruby/setup-ruby@v1 + with: + ruby-version: "head" + bundler-cache: true # runs 'bundle install' and caches installed gems automatically + + - name: Standard Ruby + run: bundle exec standardrb + + spec: + runs-on: ubuntu-latest + timeout-minutes: 3 + + services: + postgres: + image: postgres + env: + POSTGRES_PASSWORD: postgres + options: >- + --health-cmd pg_isready + --health-interval 2s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Ruby + `bundle install` + uses: ruby/setup-ruby@v1 + with: + ruby-version: "head" + bundler-cache: true # runs 'bundle install' and caches installed gems automatically + + # There is a version of Go on Actions' base image, but it's old and can't + # read modern `go.mod` annotations correctly. + - name: Install Go + uses: actions/setup-go@v4 + with: + go-version: "stable" + check-latest: true + + - name: Create database + run: psql --echo-errors --quiet -c '\timing off' -c "CREATE DATABASE riverqueue_ruby_test;" ${ADMIN_DATABASE_URL} + + - name: Install River CLI + run: go install github.com/riverqueue/river/cmd/river@latest + + - name: river migrate-up + run: river migrate-up --database-url "$TEST_DATABASE_URL" + + - name: Rspec + run: bundle exec rspec diff --git a/.gitignore b/.gitignore index c111b33..e8fe1c4 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.gem +coverage/ diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..f6b3558 --- /dev/null +++ b/Gemfile @@ -0,0 +1,14 @@ +source "https://rubygems.org" + +gemspec + +group :development, :test do + gem "standard" +end + +group :test do + gem "debug" + gem "rspec-core" + gem "rspec-expectations" + gem "simplecov", require: false +end diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 0000000..476e1be --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,93 @@ +PATH + remote: . + specs: + riverqueue (0.0.1) + +GEM + remote: https://rubygems.org/ + specs: + ast (2.4.2) + debug (1.9.1) + irb (~> 1.10) + reline (>= 0.3.8) + diff-lcs (1.5.0) + docile (1.4.0) + io-console (0.7.2) + irb (1.11.2) + rdoc + reline (>= 0.4.2) + json (2.7.1) + language_server-protocol (3.17.0.3) + lint_roller (1.1.0) + parallel (1.24.0) + parser (3.3.0.5) + ast (~> 2.4.1) + racc + psych (5.1.2) + stringio + racc (1.7.3) + rainbow (3.1.1) + rdoc (6.6.2) + psych (>= 4.0.0) + regexp_parser (2.9.0) + reline (0.4.3) + io-console (~> 0.5) + rexml (3.2.6) + rspec-core (3.12.2) + rspec-support (~> 3.12.0) + rspec-expectations (3.12.3) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.12.0) + rspec-support (3.12.1) + rubocop (1.61.0) + json (~> 2.3) + language_server-protocol (>= 3.17.0) + parallel (~> 1.10) + parser (>= 3.3.0.2) + rainbow (>= 2.2.2, < 4.0) + regexp_parser (>= 1.8, < 3.0) + rexml (>= 3.2.5, < 4.0) + rubocop-ast (>= 1.30.0, < 2.0) + ruby-progressbar (~> 1.7) + unicode-display_width (>= 2.4.0, < 3.0) + rubocop-ast (1.31.1) + parser (>= 3.3.0.4) + rubocop-performance (1.20.2) + rubocop (>= 1.48.1, < 2.0) + rubocop-ast (>= 1.30.0, < 2.0) + ruby-progressbar (1.13.0) + simplecov (0.22.0) + docile (~> 1.1) + simplecov-html (~> 0.11) + simplecov_json_formatter (~> 0.1) + simplecov-html (0.12.3) + simplecov_json_formatter (0.1.4) + standard (1.34.0) + language_server-protocol (~> 3.17.0.2) + lint_roller (~> 1.0) + rubocop (~> 1.60) + standard-custom (~> 1.0.0) + standard-performance (~> 1.3) + standard-custom (1.0.2) + lint_roller (~> 1.0) + rubocop (~> 1.50) + standard-performance (1.3.1) + lint_roller (~> 1.1) + rubocop-performance (~> 1.20.2) + stringio (3.1.0) + unicode-display_width (2.5.0) + +PLATFORMS + arm64-darwin-22 + x86_64-linux + +DEPENDENCIES + debug + riverqueue! + rspec-core + rspec-expectations + simplecov + standard + +BUNDLED WITH + 2.4.20 diff --git a/README.md b/README.md deleted file mode 100644 index afa2498..0000000 --- a/README.md +++ /dev/null @@ -1,8 +0,0 @@ -# River Ruby bindings - -A future home for River's Ruby bindings. For now, the [Gem is registered](https://rubygems.org/gems/riverqueue), but nothing else is done. - -``` sh -$ gem build riverqueue.gemspec -$ gem push riverqueue-0.0.1.gem -``` diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..7c11756 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,74 @@ +# River client for Ruby [![Build Status](https://github.com/riverqueue/riverqueue-ruby/workflows/CI/badge.svg)](https://github.com/riverqueue/riverqueue-ruby/actions) + +An insert-only Ruby client for [River](https://github.com/riverqueue/river) packaged in the [`riverqueue` gem](https://rubygems.org/gems/riverqueue). Allows jobs to be inserted in Ruby and run by a Go worker, but doesn't support working jobs in Ruby. + +## Basic usage + +`Gemfile` should contain the core gem and a driver like [`rubyqueue-sequel`](https://github.com/riverqueue/riverqueue-ruby-sequel): + +``` yaml +gem "riverqueue" +gem "riverqueue-sequel" +``` + +Initialize a client with: + +```ruby +DB = Sequel.connect("postgres://...") +client = River::Client.new(River::Driver::Sequel.new(DB)) +``` + +Define a job and insert it: + +```ruby +class SortArgs + attr_accessor :strings + + def initialize(strings:) + self.strings = strings + end + + def kind = "sort" + + def to_json = JSON.dump({strings: strings}) +end + +job = client.insert(SimpleArgs.new(strings: ["whale", "tiger", "bear"])) +``` + +Job args should: + +* Respond to `#kind` with a unique string that identifies them in the database, and which a Go worker will recognize. +* Response to `#to_json` with a JSON serialization that'll be parseable in Go. + +They may also respond to `#insert_opts` with an instance of `InsertOpts` to define insertion options that'll be used for all jobs of the kind. + +### Insertion options + +Inserts take an `insert_opts` parameter to customize features of the inserted job: + +```ruby +job = client.insert( + SimpleArgs.new(strings: ["whale", "tiger", "bear"]), + insert_opts: River::InsertOpts.new( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + ) +) +``` + +### Inserting with a Ruby hash + +`JobArgsHash` can be used to insert with a kind and JSON hash so that it's not necessary to define a class: + +```ruby +job = client.insert(River::JobArgsHash.new("hash_kind", { + job_num: 1 +})) +``` + +## Development + +See [development](./development.md). diff --git a/docs/development.md b/docs/development.md new file mode 100644 index 0000000..05caf15 --- /dev/null +++ b/docs/development.md @@ -0,0 +1,39 @@ +# riverqueue-ruby development + +## Install dependencies + +```shell +$ bundle install +``` +## Run tests + +Create a test database and migrate with River's CLI: + +```shell +$ go install github.com/riverqueue/river/cmd/river +$ createdb riverqueue_ruby_test +$ river migrate-up --database-url "postgres://localhost/riverqueue_ruby_test" +``` + +Run all specs: + +```shell +$ bundle exec rspec spec +``` + +## Run lint + +```shell +$ standardrb --fix +``` + +## Publish a new gem + +```shell +git checkout master && git pull --rebase +VERSION=v0.0.x +gem build riverqueue.gemspec +gem push riverqueue-$VERSION.gem +git tag $VERSION +git push --tags +``` \ No newline at end of file diff --git a/lib/client.rb b/lib/client.rb new file mode 100644 index 0000000..0519c83 --- /dev/null +++ b/lib/client.rb @@ -0,0 +1,59 @@ +module River + MAX_ATTEMPTS_DEFAULT = 25 + PRIORITY_DEFAULT = 1 + QUEUE_DEFAULT = "default" + + # Provides a client for River that inserts jobs. Unlike the Go version of the + # River client, this one can insert jobs only. Jobs can only be worked from Go + # code, so job arg kinds and JSON encoding details must be shared between Ruby + # and Go code. + # + # Used in conjunction with a River driver like: + # + # DB = Sequel.connect(...) + # client = River::Client.new(River::Driver::Sequel.new(DB)) + # + # River drivers are found in separate gems like `riverqueue-sequel` to help + # minimize transient dependencies. + class Client + def initialize(driver) + @driver = driver + end + + # Inserts a new job for work given a job args implementation and insertion + # options (which may be omitted). + # + # Job arg implementations are expected to respond to: + # + # * `#kind`: A string that uniquely identifies the job in the database. + # * `#to_json`: Encodes the args to JSON for persistence in the database. + # Must match encoding an args struct on the Go side to be workable. + # + # They may also respond to `#insert_opts` which is expected to return an + # `InsertOpts` that contains options that will apply to all jobs of this + # kind. Insertion options provided as an argument to `#insert` override + # those returned by job args. + def insert(args, insert_opts: InsertOpts.new) + raise "args should respond to `#kind`" if !args.respond_to?(:kind) + + # ~all objects in Ruby respond to `#to_json`, so check non-nil instead. + args_json = args.to_json + raise "args should return non-nil from `#to_json`" if !args_json + + args_insert_opts = args.respond_to?(:insert_opts) ? args.insert_opts : InsertOpts.new + + scheduled_at = insert_opts.scheduled_at || args_insert_opts.scheduled_at + + @driver.insert(Driver::JobInsertParams.new( + encoded_args: args_json, + kind: args.kind, + max_attempts: insert_opts.max_attempts || args_insert_opts.max_attempts || MAX_ATTEMPTS_DEFAULT, + priority: insert_opts.priority || args_insert_opts.priority || PRIORITY_DEFAULT, + queue: insert_opts.queue || args_insert_opts.queue || QUEUE_DEFAULT, + scheduled_at: scheduled_at, # database default to now + state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE, + tags: insert_opts.tags || args_insert_opts.tags + )) + end + end +end diff --git a/lib/driver.rb b/lib/driver.rb new file mode 100644 index 0000000..d756619 --- /dev/null +++ b/lib/driver.rb @@ -0,0 +1,48 @@ +module River + # Contains an interface used by the top-level River module to interface with + # its driver implementations. All types and methods in this module should be + # considered to be for internal use only and subject to change. API stability + # is not guaranteed. + module Driver + # Insert parameters for a job. This is sent to underlying drivers and is meant + # for internal use only. Its interface is subject to change. + class JobInsertParams + attr_accessor :encoded_args + attr_accessor :kind + attr_accessor :max_attempts + attr_accessor :priority + attr_accessor :queue + attr_accessor :scheduled_at + attr_accessor :state + attr_accessor :tags + + # TODO(brandur): Get these supported. + # attr_accessor :unique + # attr_accessor :unique_by_args + # attr_accessor :unique_by_period + # attr_accessor :unique_by_queue + # attr_accessor :unique_by_state + + def initialize( + encoded_args: nil, + kind: nil, + max_attempts: nil, + priority: nil, + queue: nil, + scheduled_at: nil, + state: nil, + tags: nil + ) + self.encoded_args = encoded_args + self.kind = kind + self.max_attempts = max_attempts + self.priority = priority + self.queue = queue + self.scheduled_at = scheduled_at + self.state = state + self.tags = tags + end + end + end + private_constant :Driver +end diff --git a/lib/insert_opts.rb b/lib/insert_opts.rb new file mode 100644 index 0000000..08fdf16 --- /dev/null +++ b/lib/insert_opts.rb @@ -0,0 +1,62 @@ +module River + class InsertOpts + # MaxAttempts is the maximum number of total attempts (including both the + # original run and all retries) before a job is abandoned and set as + # discarded. + attr_accessor :max_attempts + + # Priority is the priority of the job, with 1 being the highest priority and + # 4 being the lowest. When fetching available jobs to work, the highest + # priority jobs will always be fetched before any lower priority jobs are + # fetched. Note that if your workers are swamped with more high-priority jobs + # then they can handle, lower priority jobs may not be fetched. + # + # Defaults to PRIORITY_DEFAULT. + attr_accessor :priority + + # Queue is the name of the job queue in which to insert the job. + # + # Defaults to QUEUE_DEFAULT. + attr_accessor :queue + + # ScheduledAt is a time in future at which to schedule the job (i.e. in + # cases where it shouldn't be run immediately). The job is guaranteed not + # to run before this time, but may run slightly after depending on the + # number of other scheduled jobs and how busy the queue is. + # + # Use of this option generally only makes sense when passing options into + # Insert rather than when a job args is returning `#insert_opts`, however, + # it will work in both cases. + attr_accessor :scheduled_at + + # Tags are an arbitrary list of keywords to add to the job. They have no + # functional behavior and are meant entirely as a user-specified construct + # to help group and categorize jobs. + # + # If tags are specified from both a job args override and from options on + # Insert, the latter takes precedence. Tags are not merged. + attr_accessor :tags + + # UniqueOpts returns options relating to job uniqueness. An empty struct + # avoids setting any worker-level unique options. + # + # TODO: Implement. + attr_accessor :unique_opts + + def initialize( + max_attempts: nil, + priority: nil, + queue: nil, + scheduled_at: nil, + tags: nil, + unique_opts: nil + ) + self.max_attempts = max_attempts + self.priority = priority + self.queue = queue + self.scheduled_at = scheduled_at + self.tags = tags + self.unique_opts = unique_opts + end + end +end diff --git a/lib/job.rb b/lib/job.rb new file mode 100644 index 0000000..73180bb --- /dev/null +++ b/lib/job.rb @@ -0,0 +1,165 @@ +module River + JOB_STATE_AVAILABLE = "available" + JOB_STATE_CANCELLED = "cancelled" + JOB_STATE_COMPLETED = "completed" + JOB_STATE_DISCARDED = "discarded" + JOB_STATE_RETRYABLE = "retryable" + JOB_STATE_RUNNING = "running" + JOB_STATE_SCHEDULED = "scheduled" + + # Provides a way of creating a job args from a simple Ruby hash for a quick + # way to insert a job without having to define a class. The first argument is + # a "kind" string for identifying the job in the database and the second is a + # hash that will be encoded to JSON. + class JobArgsHash + def initialize(kind, hash) + raise "kind should be non-nil" if !kind + raise "hash should be non-nil" if !hash + + @kind = kind + @hash = hash + end + + attr_reader :kind + + def to_json + JSON.dump(@hash) + end + end + + # JobRow contains the properties of a job that are persisted to the database. + class JobRow + # ID of the job. Generated as part of a Postgres sequence and generally + # ascending in nature, but there may be gaps in it as transactions roll + # back. + attr_accessor :id + + # The attempt number of the job. Jobs are inserted at 0, the number is + # incremented to 1 the first time work its worked, and may increment further + # if it's either snoozed or errors. + attr_accessor :attempt + + # The time that the job was last worked. Starts out as `nil` on a new + # insert. + attr_accessor :attempted_at + + # The set of worker IDs that have worked this job. A worker ID differs + # between different programs, but is shared by all executors within any + # given one. (i.e. Different Go processes have different IDs, but IDs are + # shared within any given process.) A process generates a new ULID (an + # ordered UUID) worker ID when it starts up. + attr_accessor :attempted_by + + # When the job record was created. + attr_accessor :created_at + + # The job's args encoded as JSON. + attr_accessor :encoded_args + + # A set of errors that occurred when the job was worked, one for each + # attempt. Ordered from earliest error to the latest error. + attr_accessor :errors + + # The time at which the job was "finalized", meaning it was either completed + # successfully or errored for the last time such that it'll no longer be + # retried. + attr_accessor :finalized_at + + # Kind uniquely identifies the type of job and instructs which worker + # should work it. It is set at insertion time via `#kind` on job args. + attr_accessor :kind + + # The maximum number of attempts that the job will be tried before it errors + # for the last time and will no longer be worked. + attr_accessor :max_attempts + + # The priority of the job, with 1 being the highest priority and 4 being the + # lowest. When fetching available jobs to work, the highest priority jobs + # will always be fetched before any lower priority jobs are fetched. Note + # that if your workers are swamped with more high-priority jobs then they + # can handle, lower priority jobs may not be fetched. + attr_accessor :priority + + # The name of the queue where the job will be worked. Queues can be + # configured independently and be used to isolate jobs. + attr_accessor :queue + + # When the job is scheduled to become available to be worked. Jobs default + # to running immediately, but may be scheduled for the future when they're + # inserted. They may also be scheduled for later because they were snoozed + # or because they errored and have additional retry attempts remaining. + attr_accessor :scheduled_at + + # The state of job like `available` or `completed`. Jobs are `available` + # when they're first inserted. + attr_accessor :state + + # Tags are an arbitrary list of keywords to add to the job. They have no + # functional behavior and are meant entirely as a user-specified construct + # to help group and categorize jobs. + attr_accessor :tags + + def initialize( + id: nil, + attempt: nil, + attempted_by: nil, + created_at: nil, + encoded_args: nil, + errors: nil, + finalized_at: nil, + kind: nil, + max_attempts: nil, + priority: nil, + queue: nil, + scheduled_at: nil, + state: nil, + tags: nil + ) + self.id = id + self.attempt = attempt + self.attempted_by = attempted_by + self.created_at = created_at + self.encoded_args = encoded_args + self.errors = errors + self.finalized_at = finalized_at + self.kind = kind + self.max_attempts = max_attempts + self.priority = priority + self.queue = queue + self.scheduled_at = scheduled_at + self.state = state + self.tags = tags + end + end + + # A failed job work attempt containing information about the error or panic + # that occurred. + class AttemptError + # The time at which the error occurred. + attr_accessor :at + + # The attempt number on which the error occurred (maps to #attempt on a job + # row). + attr_accessor :attempt + + # Contains the stringified error of an error returned from a job or a panic + # value in case of a panic. + attr_accessor :error + + # Contains a stack trace from a job that panicked. The trace is produced by + # invoking `debug.Trace()`. + attr_accessor :trace + + def initialize( + at: nil, + attempt: nil, + error: nil, + trace: nil + ) + self.at = at + self.attempt = attempt + self.error = error + self.trace = trace + end + end +end diff --git a/lib/riverqueue.rb b/lib/riverqueue.rb index e69de29..e77b938 100644 --- a/lib/riverqueue.rb +++ b/lib/riverqueue.rb @@ -0,0 +1,9 @@ +require "json" + +require_relative "client" +require_relative "driver" +require_relative "insert_opts" +require_relative "job" + +module River +end diff --git a/riverqueue.gemspec b/riverqueue.gemspec index 1da7766..c7e2ba3 100644 --- a/riverqueue.gemspec +++ b/riverqueue.gemspec @@ -1,11 +1,11 @@ Gem::Specification.new do |s| - s.name = "riverqueue" - s.version = "0.0.1" - s.summary = "A fast job queue for Go" + s.name = "riverqueue" + s.version = "0.0.1" + s.summary = "A fast job queue for Go." s.description = "A fast job queue for Go." - s.authors = ["Blake Gentry", "Brandur Leach"] - s.email = "brandur@brandur.org" - s.files = ["lib/riverqueue.rb"] - s.homepage = "https://riverqueue.com" - s.license = "LGPL-3.0-or-later" + s.authors = ["Blake Gentry", "Brandur Leach"] + s.email = "brandur@brandur.org" + s.files = ["lib/riverqueue.rb"] + s.homepage = "https://riverqueue.com" + s.license = "LGPL-3.0-or-later" end diff --git a/spec/client_spec.rb b/spec/client_spec.rb new file mode 100644 index 0000000..63aea4e --- /dev/null +++ b/spec/client_spec.rb @@ -0,0 +1,161 @@ +require "spec_helper" + +# We use a mock here, but each driver has a more comprehensive test suite that +# performs full integration level tests. +class MockDriver + def initialize + @insert_params = [] + @next_id = 0 + end + + def insert(insert_params) + @insert_params << insert_params + + River::JobRow.new( + id: (@next_id += 1), + attempt: 0, + attempted_by: nil, + created_at: Time.now, + encoded_args: insert_params.encoded_args, + errors: nil, + finalized_at: nil, + kind: insert_params.kind, + max_attempts: insert_params.max_attempts, + priority: insert_params.priority, + queue: insert_params.queue, + scheduled_at: insert_params.scheduled_at || Time.now, # normally defaults from DB + state: insert_params.state, + tags: insert_params.tags + ) + end +end + +class SimpleArgs + attr_accessor :job_num + + def initialize(job_num:) + self.job_num = job_num + end + + def kind = "simple" + + def to_json = JSON.dump({job_num: job_num}) +end + +# Lets us test job-specific insertion opts by making `#insert_opts` an accessor. +# Real args that make use of this functionality will probably want to make +# `#insert_opts` a non-accessor method instead. +class SimpleArgsWithInsertOpts < SimpleArgs + attr_accessor :insert_opts +end + +RSpec.describe River::Client do + let(:client) { River::Client.new(mock_driver) } + let(:mock_driver) { MockDriver.new } + + describe "#insert" do + it "inserts a job with defaults" do + job = client.insert(SimpleArgs.new(job_num: 1)) + expect(job).to have_attributes( + id: 1, + attempt: 0, + created_at: be_within(2).of(Time.now), + encoded_args: %({"job_num":1}), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now), + state: River::JOB_STATE_AVAILABLE, + tags: nil + ) + end + + it "schedules a job" do + target_time = Time.now + 1 * 3600 + + job = client.insert( + SimpleArgs.new(job_num: 1), + insert_opts: River::InsertOpts.new(scheduled_at: target_time) + ) + expect(job).to have_attributes( + scheduled_at: be_within(2).of(target_time), + state: River::JOB_STATE_SCHEDULED + ) + end + + it "inserts with job insert opts" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + job = client.insert(args) + expect(job).to have_attributes( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + end + + it "inserts with insert opts" do + # We set job insert opts in this spec too so that we can verify that the + # options passed at insertion time take precedence. + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + job = client.insert(args, insert_opts: River::InsertOpts.new( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + )) + expect(job).to have_attributes( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + ) + end + + it "inserts with job args hash" do + job = client.insert(River::JobArgsHash.new("hash_kind", { + job_num: 1 + })) + expect(job).to have_attributes( + encoded_args: %({"job_num":1}), + kind: "hash_kind" + ) + end + + it "errors if args don't respond to #kind" do + args_klass = Class.new do + def to_json = {} + end + + expect do + client.insert(args_klass.new) + end.to raise_error(RuntimeError, "args should respond to `#kind`") + end + + it "errors if args return nil from #to_json" do + args_klass = Class.new do + def kind = "args_kind" + def to_json = nil + end + + expect do + client.insert(args_klass.new) + end.to raise_error(RuntimeError, "args should return non-nil from `#to_json`") + end + end +end diff --git a/spec/job_spec.rb b/spec/job_spec.rb new file mode 100644 index 0000000..d34b40a --- /dev/null +++ b/spec/job_spec.rb @@ -0,0 +1,40 @@ +require "spec_helper" + +describe River::JobArgsHash do + it "generates a job args based on a hash" do + args = River::JobArgsHash.new("my_hash_kind", {job_num: 123}) + expect(args.kind).to eq("my_hash_kind") + expect(args.to_json).to eq(JSON.dump({job_num: 123})) + end + + it "errors on a nil kind" do + expect do + River::JobArgsHash.new(nil, {job_num: 123}) + end.to raise_error(RuntimeError, "kind should be non-nil") + end + + it "errors on a nil hash" do + expect do + River::JobArgsHash.new("my_hash_kind", nil) + end.to raise_error(RuntimeError, "hash should be non-nil") + end +end + +describe River::AttemptError do + it "initializes with parameters" do + now = Time.now + + attempt_error = River::AttemptError.new( + at: now, + attempt: 1, + error: "job failure", + trace: "error trace" + ) + expect(attempt_error).to have_attributes( + at: now, + attempt: 1, + error: "job failure", + trace: "error trace" + ) + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..f99844c --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,12 @@ +# Only show coverage information if running the entire suite. +if RSpec.configuration.files_to_run.length > 1 + require "simplecov" + SimpleCov.start do + enable_coverage :branch + minimum_coverage line: 100, branch: 100 + end +end + +require "debug" + +require "riverqueue"