diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..d6b30d0 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,132 @@ +name: CI + +env: + # Database to connect to that can create other databases with `CREATE DATABASE`. + ADMIN_DATABASE_URL: postgres://postgres:postgres@localhost:5432 + + # Just a common place for steps to put binaries they need and which is added + # to GITHUB_PATH/PATH. + BIN_PATH: /home/runner/bin + + # A suitable URL for a test database. + TEST_DATABASE_URL: postgres://postgres:postgres@127.0.0.1:5432/riverqueue_ruby_test?sslmode=disable + +on: + - push + +jobs: + lint: + runs-on: ubuntu-latest + timeout-minutes: 3 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Ruby + `bundle install` + uses: ruby/setup-ruby@v1 + with: + ruby-version: "head" + bundler-cache: true # runs 'bundle install' and caches installed gems automatically + + - name: Standard Ruby (riverqueue-ruby) + run: bundle exec standardrb + working-directory: . + + - name: bundle install (riverqueue-activerecord) + run: bundle install + working-directory: ./drivers/riverqueue-activerecord + + - name: Standard Ruby (riverqueue-activerecord) + run: bundle exec standardrb + working-directory: ./drivers/riverqueue-activerecord + + - name: bundle install (riverqueue-sequel) + run: bundle install + working-directory: ./drivers/riverqueue-sequel + + - name: Standard Ruby (riverqueue-sequel) + run: bundle exec standardrb + working-directory: ./drivers/riverqueue-sequel + + type_check: + runs-on: ubuntu-latest + timeout-minutes: 3 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Ruby + `bundle install` + uses: ruby/setup-ruby@v1 + with: + ruby-version: "head" + bundler-cache: true # runs 'bundle install' and caches installed gems automatically + + - name: Steep (riverqueue-ruby) + run: bundle exec steep check + working-directory: . + + spec: + runs-on: ubuntu-latest + timeout-minutes: 3 + + services: + postgres: + image: postgres + env: + POSTGRES_PASSWORD: postgres + options: >- + --health-cmd pg_isready + --health-interval 2s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Ruby + `bundle install` + uses: ruby/setup-ruby@v1 + with: + ruby-version: "head" + bundler-cache: true # runs 'bundle install' and caches installed gems automatically + + # There is a version of Go on Actions' base image, but it's old and can't + # read modern `go.mod` annotations correctly. + - name: Install Go + uses: actions/setup-go@v4 + with: + go-version: "stable" + check-latest: true + + - name: Create database + run: psql --echo-errors --quiet -c '\timing off' -c "CREATE DATABASE riverqueue_ruby_test;" ${ADMIN_DATABASE_URL} + + - name: Install River CLI + run: go install github.com/riverqueue/river/cmd/river@latest + + - name: river migrate-up + run: river migrate-up --database-url "$TEST_DATABASE_URL" + + - name: Rspec (riverqueue-ruby) + run: bundle exec rspec + working-directory: . + + - name: bundle install (riverqueue-activerecord) + run: bundle install + working-directory: ./drivers/riverqueue-activerecord + + - name: Rspec (riverqueue-activerecord) + run: bundle exec rspec + working-directory: ./drivers/riverqueue-activerecord + + - name: bundle install (riverqueue-sequel) + run: bundle install + working-directory: ./drivers/riverqueue-sequel + + - name: Rspec (riverqueue-sequel) + run: bundle exec rspec + working-directory: ./drivers/riverqueue-sequel diff --git a/.gitignore b/.gitignore index c111b33..e8fe1c4 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.gem +coverage/ diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..213a8d2 --- /dev/null +++ b/Gemfile @@ -0,0 +1,15 @@ +source "https://rubygems.org" + +gemspec + +group :development, :test do + gem "standard" + gem "steep" +end + +group :test do + gem "debug" + gem "rspec-core" + gem "rspec-expectations" + gem "simplecov", require: false +end diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 0000000..0c40957 --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,147 @@ +PATH + remote: . + specs: + riverqueue (0.0.1) + +GEM + remote: https://rubygems.org/ + specs: + abbrev (0.1.2) + activesupport (7.1.3.2) + base64 + bigdecimal + concurrent-ruby (~> 1.0, >= 1.0.2) + connection_pool (>= 2.2.5) + drb + i18n (>= 1.6, < 2) + minitest (>= 5.1) + mutex_m + tzinfo (~> 2.0) + ast (2.4.2) + base64 (0.2.0) + bigdecimal (3.1.7) + concurrent-ruby (1.2.3) + connection_pool (2.4.1) + csv (3.3.0) + debug (1.9.1) + irb (~> 1.10) + reline (>= 0.3.8) + diff-lcs (1.5.0) + docile (1.4.0) + drb (2.2.1) + ffi (1.16.3) + fileutils (1.7.2) + i18n (1.14.4) + concurrent-ruby (~> 1.0) + io-console (0.7.2) + irb (1.11.2) + rdoc + reline (>= 0.4.2) + json (2.7.1) + language_server-protocol (3.17.0.3) + lint_roller (1.1.0) + listen (3.9.0) + rb-fsevent (~> 0.10, >= 0.10.3) + rb-inotify (~> 0.9, >= 0.9.10) + logger (1.6.0) + minitest (5.22.3) + mutex_m (0.2.0) + parallel (1.24.0) + parser (3.3.0.5) + ast (~> 2.4.1) + racc + psych (5.1.2) + stringio + racc (1.7.3) + rainbow (3.1.1) + rb-fsevent (0.11.2) + rb-inotify (0.10.1) + ffi (~> 1.0) + rbs (3.4.4) + abbrev + rdoc (6.6.2) + psych (>= 4.0.0) + regexp_parser (2.9.0) + reline (0.4.3) + io-console (~> 0.5) + rexml (3.2.6) + rspec-core (3.12.2) + rspec-support (~> 3.12.0) + rspec-expectations (3.12.3) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.12.0) + rspec-support (3.12.1) + rubocop (1.61.0) + json (~> 2.3) + language_server-protocol (>= 3.17.0) + parallel (~> 1.10) + parser (>= 3.3.0.2) + rainbow (>= 2.2.2, < 4.0) + regexp_parser (>= 1.8, < 3.0) + rexml (>= 3.2.5, < 4.0) + rubocop-ast (>= 1.30.0, < 2.0) + ruby-progressbar (~> 1.7) + unicode-display_width (>= 2.4.0, < 3.0) + rubocop-ast (1.31.1) + parser (>= 3.3.0.4) + rubocop-performance (1.20.2) + rubocop (>= 1.48.1, < 2.0) + rubocop-ast (>= 1.30.0, < 2.0) + ruby-progressbar (1.13.0) + securerandom (0.3.1) + simplecov (0.22.0) + docile (~> 1.1) + simplecov-html (~> 0.11) + simplecov_json_formatter (~> 0.1) + simplecov-html (0.12.3) + simplecov_json_formatter (0.1.4) + standard (1.34.0) + language_server-protocol (~> 3.17.0.2) + lint_roller (~> 1.0) + rubocop (~> 1.60) + standard-custom (~> 1.0.0) + standard-performance (~> 1.3) + standard-custom (1.0.2) + lint_roller (~> 1.0) + rubocop (~> 1.50) + standard-performance (1.3.1) + lint_roller (~> 1.1) + rubocop-performance (~> 1.20.2) + steep (1.6.0) + activesupport (>= 5.1) + concurrent-ruby (>= 1.1.10) + csv (>= 3.0.9) + fileutils (>= 1.1.0) + json (>= 2.1.0) + language_server-protocol (>= 3.15, < 4.0) + listen (~> 3.0) + logger (>= 1.3.0) + parser (>= 3.1) + rainbow (>= 2.2.2, < 4.0) + rbs (>= 3.1.0) + securerandom (>= 0.1) + strscan (>= 1.0.0) + terminal-table (>= 2, < 4) + stringio (3.1.0) + strscan (3.1.0) + terminal-table (3.0.2) + unicode-display_width (>= 1.1.1, < 3) + tzinfo (2.0.6) + concurrent-ruby (~> 1.0) + unicode-display_width (2.5.0) + +PLATFORMS + arm64-darwin-22 + x86_64-linux + +DEPENDENCIES + debug + riverqueue! + rspec-core + rspec-expectations + simplecov + standard + steep + +BUNDLED WITH + 2.4.20 diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f048067 --- /dev/null +++ b/Makefile @@ -0,0 +1,17 @@ +.PHONY: lint +lint: standardrb + +.PHONY: rspec +rspec: spec + +.PHONY: spec +spec: + bundle exec rspec + cd drivers/riverqueue-activerecord && bundle exec rspec + cd drivers/riverqueue-sequel && bundle exec rspec + +.PHONY: standardrb +standardrb: + bundle exec standardrb --fix + cd drivers/riverqueue-activerecord && bundle exec standardrb --fix + cd drivers/riverqueue-sequel && bundle exec standardrb --fix diff --git a/Steepfile b/Steepfile new file mode 100644 index 0000000..620dab0 --- /dev/null +++ b/Steepfile @@ -0,0 +1,11 @@ +D = Steep::Diagnostic + +target :lib do + check "lib" + + library "json" + + signature "sig" + + configure_code_diagnostics(D::Ruby.strict) +end diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..eb564c6 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,104 @@ +# River client for Ruby [![Build Status](https://github.com/riverqueue/riverqueue-ruby/workflows/CI/badge.svg)](https://github.com/riverqueue/riverqueue-ruby/actions) + +An insert-only Ruby client for [River](https://github.com/riverqueue/river) packaged in the [`riverqueue` gem](https://rubygems.org/gems/riverqueue). Allows jobs to be inserted in Ruby and run by a Go worker, but doesn't support working jobs in Ruby. + +## Basic usage + +`Gemfile` should contain the core gem and a driver like [`rubyqueue-sequel`](https://github.com/riverqueue/riverqueue-ruby/drivers/riverqueue-sequel) (see [drivers](#drivers)): + +``` ruby +gem "riverqueue" +gem "riverqueue-sequel" +``` + +Initialize a client with: + +```ruby +DB = Sequel.connect("postgres://...") +client = River::Client.new(River::Driver::Sequel.new(DB)) +``` + +Define a job and insert it: + +```ruby +class SortArgs + attr_accessor :strings + + def initialize(strings:) + self.strings = strings + end + + def kind = "sort" + + def to_json = JSON.dump({strings: strings}) +end + +insert_res = client.insert(SimpleArgs.new(strings: ["whale", "tiger", "bear"])) +``` + +Job args should: + +* Respond to `#kind` with a unique string that identifies them in the database, and which a Go worker will recognize. +* Response to `#to_json` with a JSON serialization that'll be parseable as an object in Go. + +They may also respond to `#insert_opts` with an instance of `InsertOpts` to define insertion options that'll be used for all jobs of the kind. + +### Insertion options + +Inserts take an `insert_opts` parameter to customize features of the inserted job: + +```ruby +insert_res = client.insert( + SimpleArgs.new(strings: ["whale", "tiger", "bear"]), + insert_opts: River::InsertOpts.new( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + ) +) +``` + +### Inserting with a Ruby hash + +`JobArgsHash` can be used to insert with a kind and JSON hash so that it's not necessary to define a class: + +```ruby +insert_res = client.insert(River::JobArgsHash.new("hash_kind", { + job_num: 1 +})) +``` + +## Drivers + +### ActiveRecord + +``` ruby +gem "riverqueue" +gem "riverqueue-activerecord" +``` + +Initialize driver and client: + +```ruby +ActiveRecord::Base.establish_connection("postgres://...") +client = River::Client.new(River::Driver::ActiveRecord.new) +``` + +### Sequel + +``` ruby +gem "riverqueue" +gem "riverqueue-sequel" +``` + +Initialize driver and client: + +```ruby +DB = Sequel.connect("postgres://...") +client = River::Client.new(River::Driver::Sequel.new(DB)) +``` + +## Development + +See [development](./development.md). diff --git a/docs/development.md b/docs/development.md new file mode 100644 index 0000000..71818eb --- /dev/null +++ b/docs/development.md @@ -0,0 +1,54 @@ +# riverqueue-ruby development + +## Install dependencies + +```shell +$ bundle install +``` +## Run tests + +Create a test database and migrate with River's CLI: + +```shell +$ go install github.com/riverqueue/river/cmd/river +$ createdb riverqueue_ruby_test +$ river migrate-up --database-url "postgres://localhost/riverqueue_ruby_test" +``` + +Run all specs: + +```shell +$ bundle exec rspec spec +``` + +## Run lint + +```shell +$ bundle exec standardrb --fix +``` + +## Run type check (Steep) + +```shell +$ bundle exec steep check +``` + +## Code coverage + +Running the entire test suite will produce a coverage report, and will fail if line and branch coverage is below 100%. Run the suite and open `coverage/index.html` to find lines or branches that weren't covered: + +```shell +$ bundle exec rspec spec +$ open coverage/index.html +``` + +## Publish a new gem + +```shell +git checkout master && git pull --rebase +VERSION=v0.0.x +gem build riverqueue.gemspec +gem push riverqueue-$VERSION.gem +git tag $VERSION +git push --tags +``` \ No newline at end of file diff --git a/drivers/riverqueue-activerecord/Gemfile b/drivers/riverqueue-activerecord/Gemfile new file mode 100644 index 0000000..3fd0fb5 --- /dev/null +++ b/drivers/riverqueue-activerecord/Gemfile @@ -0,0 +1,15 @@ +source "https://rubygems.org" + +gemspec + +group :development, :test do + gem "riverqueue", path: "../.." + gem "standard" +end + +group :test do + gem "debug" + gem "rspec-core" + gem "rspec-expectations" + gem "simplecov", require: false +end diff --git a/drivers/riverqueue-activerecord/Gemfile.lock b/drivers/riverqueue-activerecord/Gemfile.lock new file mode 100644 index 0000000..d86dfd4 --- /dev/null +++ b/drivers/riverqueue-activerecord/Gemfile.lock @@ -0,0 +1,132 @@ +PATH + remote: ../.. + specs: + riverqueue (0.0.1) + +PATH + remote: . + specs: + riverqueue-activerecord (0.0.1) + activerecord + activesupport + pg + +GEM + remote: https://rubygems.org/ + specs: + activemodel (7.1.3.2) + activesupport (= 7.1.3.2) + activerecord (7.1.3.2) + activemodel (= 7.1.3.2) + activesupport (= 7.1.3.2) + timeout (>= 0.4.0) + activesupport (7.1.3.2) + base64 + bigdecimal + concurrent-ruby (~> 1.0, >= 1.0.2) + connection_pool (>= 2.2.5) + drb + i18n (>= 1.6, < 2) + minitest (>= 5.1) + mutex_m + tzinfo (~> 2.0) + ast (2.4.2) + base64 (0.2.0) + bigdecimal (3.1.6) + concurrent-ruby (1.2.3) + connection_pool (2.4.1) + debug (1.9.1) + irb (~> 1.10) + reline (>= 0.3.8) + diff-lcs (1.5.1) + docile (1.4.0) + drb (2.2.1) + i18n (1.14.3) + concurrent-ruby (~> 1.0) + racc (~> 1.7) + io-console (0.7.2) + irb (1.11.2) + rdoc + reline (>= 0.4.2) + json (2.7.1) + language_server-protocol (3.17.0.3) + lint_roller (1.1.0) + minitest (5.22.2) + mutex_m (0.2.0) + parallel (1.24.0) + parser (3.3.0.5) + ast (~> 2.4.1) + racc + pg (1.5.4) + psych (5.1.2) + stringio + racc (1.7.3) + rainbow (3.1.1) + rdoc (6.6.2) + psych (>= 4.0.0) + regexp_parser (2.9.0) + reline (0.4.3) + io-console (~> 0.5) + rexml (3.2.6) + rspec-core (3.13.0) + rspec-support (~> 3.13.0) + rspec-expectations (3.13.0) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.13.0) + rspec-support (3.13.1) + rubocop (1.61.0) + json (~> 2.3) + language_server-protocol (>= 3.17.0) + parallel (~> 1.10) + parser (>= 3.3.0.2) + rainbow (>= 2.2.2, < 4.0) + regexp_parser (>= 1.8, < 3.0) + rexml (>= 3.2.5, < 4.0) + rubocop-ast (>= 1.30.0, < 2.0) + ruby-progressbar (~> 1.7) + unicode-display_width (>= 2.4.0, < 3.0) + rubocop-ast (1.31.1) + parser (>= 3.3.0.4) + rubocop-performance (1.20.2) + rubocop (>= 1.48.1, < 2.0) + rubocop-ast (>= 1.30.0, < 2.0) + ruby-progressbar (1.13.0) + simplecov (0.22.0) + docile (~> 1.1) + simplecov-html (~> 0.11) + simplecov_json_formatter (~> 0.1) + simplecov-html (0.12.3) + simplecov_json_formatter (0.1.4) + standard (1.34.0) + language_server-protocol (~> 3.17.0.2) + lint_roller (~> 1.0) + rubocop (~> 1.60) + standard-custom (~> 1.0.0) + standard-performance (~> 1.3) + standard-custom (1.0.2) + lint_roller (~> 1.0) + rubocop (~> 1.50) + standard-performance (1.3.1) + lint_roller (~> 1.1) + rubocop-performance (~> 1.20.2) + stringio (3.1.0) + timeout (0.4.1) + tzinfo (2.0.6) + concurrent-ruby (~> 1.0) + unicode-display_width (2.5.0) + +PLATFORMS + arm64-darwin-22 + x86_64-linux + +DEPENDENCIES + debug + riverqueue! + riverqueue-activerecord! + rspec-core + rspec-expectations + simplecov + standard + +BUNDLED WITH + 2.4.20 diff --git a/README.md b/drivers/riverqueue-activerecord/README.md similarity index 53% rename from README.md rename to drivers/riverqueue-activerecord/README.md index afa2498..265dd01 100644 --- a/README.md +++ b/drivers/riverqueue-activerecord/README.md @@ -1,8 +1,8 @@ -# River Ruby bindings +# River Ruby bindings ActiveRecord driver A future home for River's Ruby bindings. For now, the [Gem is registered](https://rubygems.org/gems/riverqueue), but nothing else is done. ``` sh -$ gem build riverqueue.gemspec -$ gem push riverqueue-0.0.1.gem +$ gem build riverqueue-activerecord.gemspec +$ gem push riverqueue-activerecord-0.0.1.gem ``` diff --git a/drivers/riverqueue-activerecord/docs/README.md b/drivers/riverqueue-activerecord/docs/README.md new file mode 100644 index 0000000..7d01cf8 --- /dev/null +++ b/drivers/riverqueue-activerecord/docs/README.md @@ -0,0 +1,23 @@ +# riverqueue-sequel [![Build Status](https://github.com/riverqueue/riverqueue-ruby-sequel/workflows/CI/badge.svg)](https://github.com/riverqueue/riverqueue-ruby-sequel/actions) + +[ActiveRecord](https://github.com/jeremyevans/sequel) driver for [River](https://github.com/riverqueue/river)'s [`riverqueue` gem for Ruby](https://rubygems.org/gems/riverqueue). + +`Gemfile` should contain the core gem and a driver like this one: + +``` yaml +gem "riverqueue" +gem "riverqueue-sequel" +``` + +Initialize a client with: + +```ruby +DB = ActiveRecord.connect("postgres://...") +client = River::Client.new(River::Driver::ActiveRecord.new(DB)) +``` + +See also [`rubyqueue`](https://github.com/riverqueue/riverqueue-ruby). + +## Development + +See [development](./development.md). diff --git a/drivers/riverqueue-activerecord/docs/development.md b/drivers/riverqueue-activerecord/docs/development.md new file mode 100644 index 0000000..2f85549 --- /dev/null +++ b/drivers/riverqueue-activerecord/docs/development.md @@ -0,0 +1,48 @@ +# riverqueue-ruby development + +## Install dependencies + +```shell +$ bundle install +``` +## Run tests + +Create a test database and migrate with River's CLI: + +```shell +$ go install github.com/riverqueue/river/cmd/river +$ createdb riverqueue_ruby_test +$ river migrate-up --database-url "postgres://localhost/riverqueue_ruby_test" +``` + +Run all specs: + +```shell +$ bundle exec rspec spec +``` + +## Run lint + +```shell +$ standardrb --fix +``` + +## Code coverage + +Running the entire test suite will produce a coverage report, and will fail if line and branch coverage is below 100%. Run the suite and open `coverage/index.html` to find lines or branches that weren't covered: + +```shell +$ bundle exec rspec spec +$ open coverage/index.html +``` + +## Publish a new gem + +```shell +git checkout master && git pull --rebase +VERSION=v0.0.x +gem build riverqueue-sequel.gemspec +gem push riverqueue-sequel-$VERSION.gem +git tag $VERSION +git push --tags +``` \ No newline at end of file diff --git a/drivers/riverqueue-activerecord/lib/driver.rb b/drivers/riverqueue-activerecord/lib/driver.rb new file mode 100644 index 0000000..4ad7317 --- /dev/null +++ b/drivers/riverqueue-activerecord/lib/driver.rb @@ -0,0 +1,88 @@ +module River::Driver + # Provides a ActiveRecord driver for River. + # + # Used in conjunction with a River client like: + # + # DB = ActiveRecord.connect("postgres://...") + # client = River::Client.new(River::Driver::ActiveRecord.new(DB)) + # + class ActiveRecord + def initialize + # It's Ruby, so we can only define a model after ActiveRecord's established a + # connection because it's all dynamic. + if !River::Driver::ActiveRecord.const_defined?(:RiverJob) + River::Driver::ActiveRecord.const_set(:RiverJob, Class.new(::ActiveRecord::Base) do + self.table_name = "river_job" + + # Unfortunately, Rails errors if you have a column called `errors` and + # provides no way to remap names (beyond ignoring a column, which we + # really don't want). This patch is in place so we can hydrate this + # model at all without ActiveRecord self-immolating. + def self.dangerous_attribute_method?(method_name) + return false if method_name == "errors" + super + end + end) + end + end + + def insert(insert_params) + # the call to `#compact` is important so that we remove nils and table + # default values get picked up instead + to_job_row( + RiverJob.insert( + { + args: insert_params.encoded_args, + kind: insert_params.kind, + max_attempts: insert_params.max_attempts, + priority: insert_params.priority, + queue: insert_params.queue, + state: insert_params.state, + scheduled_at: insert_params.scheduled_at, + tags: insert_params.tags + }.compact, + returning: Arel.sql("*") + ).first + ) + end + + # Type type injected to this method is not a `RiverJob`, but rather a raw + # hash with stringified keys because we're inserting with the Arel framework + # directly rather than generating a record from a model. + private def to_job_row(raw_job) + deserialize = ->(field) do + RiverJob._default_attributes[field].type.deserialize(raw_job[field]) + end + + # Errors is `jsonb[]` so the subtype here will decode `jsonb`. + errors_subtype = RiverJob._default_attributes["errors"].type.subtype + + River::JobRow.new( + id: deserialize.call("id"), + args: deserialize.call("args").yield_self { |a| a ? JSON.parse(a) : nil }, + attempt: deserialize.call("attempt"), + attempted_at: deserialize.call("attempted_at"), + attempted_by: deserialize.call("attempted_by"), + created_at: deserialize.call("created_at"), + errors: deserialize.call("errors")&.map do |e| + deserialized_error = errors_subtype.deserialize(e) + + River::AttemptError.new( + at: Time.parse(deserialized_error["at"]), + attempt: deserialized_error["attempt"], + error: deserialized_error["error"], + trace: deserialized_error["trace"] + ) + end, + finalized_at: deserialize.call("finalized_at"), + kind: deserialize.call("kind"), + max_attempts: deserialize.call("max_attempts"), + priority: deserialize.call("priority"), + queue: deserialize.call("queue"), + scheduled_at: deserialize.call("scheduled_at"), + state: deserialize.call("state"), + tags: deserialize.call("tags") + ) + end + end +end diff --git a/drivers/riverqueue-activerecord/lib/riverqueue-activerecord.rb b/drivers/riverqueue-activerecord/lib/riverqueue-activerecord.rb new file mode 100644 index 0000000..82bf467 --- /dev/null +++ b/drivers/riverqueue-activerecord/lib/riverqueue-activerecord.rb @@ -0,0 +1,6 @@ +require "active_record" + +require_relative "driver" + +module River +end diff --git a/drivers/riverqueue-activerecord/riverqueue-activerecord.gemspec b/drivers/riverqueue-activerecord/riverqueue-activerecord.gemspec new file mode 100644 index 0000000..1f03105 --- /dev/null +++ b/drivers/riverqueue-activerecord/riverqueue-activerecord.gemspec @@ -0,0 +1,15 @@ +Gem::Specification.new do |s| + s.name = "riverqueue-activerecord" + s.version = "0.0.1" + s.summary = "ActiveRecord driver for the River Ruby gem." + s.description = "ActiveRecord driver for the River Ruby gem." + s.authors = ["Blake Gentry", "Brandur Leach"] + s.email = "brandur@brandur.org" + s.files = ["lib/riverqueue-activerecord.rb"] + s.homepage = "https://riverqueue.com" + s.license = "LGPL-3.0-or-later" + + s.add_dependency "activerecord" + s.add_dependency "activesupport" # required for ActiveRecord to load properly + s.add_dependency "pg" +end diff --git a/drivers/riverqueue-activerecord/spec/driver_spec.rb b/drivers/riverqueue-activerecord/spec/driver_spec.rb new file mode 100644 index 0000000..6b7b745 --- /dev/null +++ b/drivers/riverqueue-activerecord/spec/driver_spec.rb @@ -0,0 +1,202 @@ +require "spec_helper" + +class SimpleArgs + attr_accessor :job_num + + def initialize(job_num:) + self.job_num = job_num + end + + def kind = "simple" + + def to_json = JSON.dump({job_num: job_num}) +end + +# Lets us test job-specific insertion opts by making `#insert_opts` an accessor. +# Real args that make use of this functionality will probably want to make +# `#insert_opts` a non-accessor method instead. +class SimpleArgsWithInsertOpts < SimpleArgs + attr_accessor :insert_opts +end + +RSpec.describe River::Driver::ActiveRecord do + around(:each) { |ex| test_transaction(&ex) } + + let!(:driver) { River::Driver::ActiveRecord.new } + let(:client) { River::Client.new(driver) } + + describe "#insert" do + it "inserts a job" do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + expect(insert_res.job).to have_attributes( + args: {"job_num" => 1}, + attempt: 0, + created_at: be_within(2).of(Time.now.utc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.utc), + state: River::JOB_STATE_AVAILABLE, + tags: [] + ) + + # Make sure it made it to the database. Assert only minimally since we're + # certain it's the same as what we checked above. + river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) + expect(river_job).to have_attributes( + kind: "simple" + ) + end + + it "schedules a job" do + target_time = Time.now.utc + 1 * 3600 + + insert_res = client.insert( + SimpleArgs.new(job_num: 1), + insert_opts: River::InsertOpts.new(scheduled_at: target_time) + ) + expect(insert_res.job).to have_attributes( + scheduled_at: be_within(2).of(target_time), + state: River::JOB_STATE_SCHEDULED + ) + end + + it "inserts with job insert opts" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + insert_res = client.insert(args) + expect(insert_res.job).to have_attributes( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + end + + it "inserts with insert opts" do + # We set job insert opts in this spec too so that we can verify that the + # options passed at insertion time take precedence. + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + insert_res = client.insert(args, insert_opts: River::InsertOpts.new( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + )) + expect(insert_res.job).to have_attributes( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + ) + end + + it "inserts with job args hash" do + insert_res = client.insert(River::JobArgsHash.new("hash_kind", { + job_num: 1 + })) + expect(insert_res.job).to have_attributes( + args: {"job_num" => 1}, + kind: "hash_kind" + ) + end + + it "inserts in a transaction" do + insert_res = nil + + ActiveRecord::Base.transaction(requires_new: true) do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + + river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) + expect(river_job).to_not be_nil + + raise ActiveRecord::Rollback + end + + # Not visible because the job was rolled back. + river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) + expect(river_job).to be_nil + end + end + + describe "#to_job_row" do + it "converts a database record to `River::JobRow`" do + now = Time.now.utc + river_job = { + id: 1, + attempt: 1, + attempted_at: now, + attempted_by: ["client1"], + created_at: now, + args: JSON.generate(%({"job_num":1})), # encoded twice, like how ActiveRecord returns it + finalized_at: now, + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: now, + state: River::JOB_STATE_COMPLETED, + tags: ["tag1"] + }.transform_keys { |k| k.to_s } + + job_row = driver.send(:to_job_row, river_job) + + expect(job_row).to be_an_instance_of(River::JobRow) + expect(job_row).to have_attributes( + id: 1, + args: {"job_num" => 1}, + attempt: 1, + attempted_at: now, + attempted_by: ["client1"], + created_at: now, + finalized_at: now, + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: now, + state: River::JOB_STATE_COMPLETED, + tags: ["tag1"] + ) + end + + it "with errors" do + now = Time.now.utc + river_job = { + errors: [JSON.dump( + { + at: now, + attempt: 1, + error: "job failure", + trace: "error trace" + } + )] + }.transform_keys { |k| k.to_s } + + job_row = driver.send(:to_job_row, river_job) + + expect(job_row.errors.count).to be(1) + expect(job_row.errors[0]).to be_an_instance_of(River::AttemptError) + expect(job_row.errors[0]).to have_attributes( + at: now.floor(0), + attempt: 1, + error: "job failure", + trace: "error trace" + ) + end + end +end diff --git a/drivers/riverqueue-activerecord/spec/spec_helper.rb b/drivers/riverqueue-activerecord/spec/spec_helper.rb new file mode 100644 index 0000000..a81f76f --- /dev/null +++ b/drivers/riverqueue-activerecord/spec/spec_helper.rb @@ -0,0 +1,20 @@ +require "active_record" +require "debug" + +ActiveRecord::Base.establish_connection(ENV["TEST_DATABASE_URL"] || "postgres://localhost/riverqueue_ruby_test") + +def test_transaction + ActiveRecord::Base.transaction do + yield + raise ActiveRecord::Rollback + end +end + +require "simplecov" +SimpleCov.start do + enable_coverage :branch + minimum_coverage line: 100, branch: 100 +end + +require "riverqueue" +require "riverqueue-activerecord" diff --git a/drivers/riverqueue-sequel/Gemfile b/drivers/riverqueue-sequel/Gemfile new file mode 100644 index 0000000..be9359d --- /dev/null +++ b/drivers/riverqueue-sequel/Gemfile @@ -0,0 +1,14 @@ +source "https://rubygems.org" + +gemspec + +group :development, :test do + gem "riverqueue", path: "../.." + gem "standard" +end + +group :test do + gem "rspec-core" + gem "rspec-expectations" + gem "simplecov", require: false +end diff --git a/drivers/riverqueue-sequel/Gemfile.lock b/drivers/riverqueue-sequel/Gemfile.lock new file mode 100644 index 0000000..6c7d55f --- /dev/null +++ b/drivers/riverqueue-sequel/Gemfile.lock @@ -0,0 +1,90 @@ +PATH + remote: ../.. + specs: + riverqueue (0.0.1) + +PATH + remote: . + specs: + riverqueue-sequel (0.0.1) + pg + sequel + +GEM + remote: https://rubygems.org/ + specs: + ast (2.4.2) + bigdecimal (3.1.4) + diff-lcs (1.5.0) + docile (1.4.0) + json (2.7.1) + language_server-protocol (3.17.0.3) + lint_roller (1.1.0) + parallel (1.24.0) + parser (3.3.0.5) + ast (~> 2.4.1) + racc + pg (1.5.4) + racc (1.7.3) + rainbow (3.1.1) + regexp_parser (2.9.0) + rexml (3.2.6) + rspec-core (3.12.2) + rspec-support (~> 3.12.0) + rspec-expectations (3.12.3) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.12.0) + rspec-support (3.12.1) + rubocop (1.61.0) + json (~> 2.3) + language_server-protocol (>= 3.17.0) + parallel (~> 1.10) + parser (>= 3.3.0.2) + rainbow (>= 2.2.2, < 4.0) + regexp_parser (>= 1.8, < 3.0) + rexml (>= 3.2.5, < 4.0) + rubocop-ast (>= 1.30.0, < 2.0) + ruby-progressbar (~> 1.7) + unicode-display_width (>= 2.4.0, < 3.0) + rubocop-ast (1.31.1) + parser (>= 3.3.0.4) + rubocop-performance (1.20.2) + rubocop (>= 1.48.1, < 2.0) + rubocop-ast (>= 1.30.0, < 2.0) + ruby-progressbar (1.13.0) + sequel (5.74.0) + bigdecimal + simplecov (0.22.0) + docile (~> 1.1) + simplecov-html (~> 0.11) + simplecov_json_formatter (~> 0.1) + simplecov-html (0.12.3) + simplecov_json_formatter (0.1.4) + standard (1.34.0) + language_server-protocol (~> 3.17.0.2) + lint_roller (~> 1.0) + rubocop (~> 1.60) + standard-custom (~> 1.0.0) + standard-performance (~> 1.3) + standard-custom (1.0.2) + lint_roller (~> 1.0) + rubocop (~> 1.50) + standard-performance (1.3.1) + lint_roller (~> 1.1) + rubocop-performance (~> 1.20.2) + unicode-display_width (2.5.0) + +PLATFORMS + arm64-darwin-22 + x86_64-linux + +DEPENDENCIES + riverqueue! + riverqueue-sequel! + rspec-core + rspec-expectations + simplecov + standard + +BUNDLED WITH + 2.4.20 diff --git a/drivers/riverqueue-sequel/docs/README.md b/drivers/riverqueue-sequel/docs/README.md new file mode 100644 index 0000000..e3f6601 --- /dev/null +++ b/drivers/riverqueue-sequel/docs/README.md @@ -0,0 +1,23 @@ +# riverqueue-sequel [![Build Status](https://github.com/riverqueue/riverqueue-ruby-sequel/workflows/CI/badge.svg)](https://github.com/riverqueue/riverqueue-ruby-sequel/actions) + +[Sequel](https://github.com/jeremyevans/sequel) driver for [River](https://github.com/riverqueue/river)'s [`riverqueue` gem for Ruby](https://rubygems.org/gems/riverqueue). + +`Gemfile` should contain the core gem and a driver like this one: + +``` yaml +gem "riverqueue" +gem "riverqueue-sequel" +``` + +Initialize a client with: + +```ruby +DB = Sequel.connect("postgres://...") +client = River::Client.new(River::Driver::Sequel.new(DB)) +``` + +See also [`rubyqueue`](https://github.com/riverqueue/riverqueue-ruby). + +## Development + +See [development](./development.md). diff --git a/drivers/riverqueue-sequel/docs/development.md b/drivers/riverqueue-sequel/docs/development.md new file mode 100644 index 0000000..2f85549 --- /dev/null +++ b/drivers/riverqueue-sequel/docs/development.md @@ -0,0 +1,48 @@ +# riverqueue-ruby development + +## Install dependencies + +```shell +$ bundle install +``` +## Run tests + +Create a test database and migrate with River's CLI: + +```shell +$ go install github.com/riverqueue/river/cmd/river +$ createdb riverqueue_ruby_test +$ river migrate-up --database-url "postgres://localhost/riverqueue_ruby_test" +``` + +Run all specs: + +```shell +$ bundle exec rspec spec +``` + +## Run lint + +```shell +$ standardrb --fix +``` + +## Code coverage + +Running the entire test suite will produce a coverage report, and will fail if line and branch coverage is below 100%. Run the suite and open `coverage/index.html` to find lines or branches that weren't covered: + +```shell +$ bundle exec rspec spec +$ open coverage/index.html +``` + +## Publish a new gem + +```shell +git checkout master && git pull --rebase +VERSION=v0.0.x +gem build riverqueue-sequel.gemspec +gem push riverqueue-sequel-$VERSION.gem +git tag $VERSION +git push --tags +``` \ No newline at end of file diff --git a/drivers/riverqueue-sequel/lib/driver.rb b/drivers/riverqueue-sequel/lib/driver.rb new file mode 100644 index 0000000..0626499 --- /dev/null +++ b/drivers/riverqueue-sequel/lib/driver.rb @@ -0,0 +1,75 @@ +module River::Driver + # Provides a Sequel driver for River. + # + # Used in conjunction with a River client like: + # + # DB = Sequel.connect("postgres://...") + # client = River::Client.new(River::Driver::Sequel.new(DB)) + # + class Sequel + def initialize(db) + @db = db + + # It's Ruby, so we can only define a model after Sequel's established a + # connection because it's all dynamic. + if !River::Driver::Sequel.const_defined?(:RiverJob) + River::Driver::Sequel.const_set(:RiverJob, Class.new(::Sequel::Model(:river_job))) + + # Since we only define our model once, take advantage of knowing this is + # our first initialization to add required extensions. + db.extension(:pg_array) + end + end + + def insert(insert_params) + # the call to `#compact` is important so that we remove nils and table + # default values get picked up instead + to_job_row( + RiverJob.create( + { + args: insert_params.encoded_args, + kind: insert_params.kind, + max_attempts: insert_params.max_attempts, + priority: insert_params.priority, + queue: insert_params.queue, + state: insert_params.state, + scheduled_at: insert_params.scheduled_at, + tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags) : nil + }.compact + ) + ) + end + + private def to_job_row(river_job) + # needs to be accessed through values because Sequel shadows `errors` + errors = river_job.values[:errors] + + River::JobRow.new( + id: river_job.id, + args: river_job.args ? JSON.parse(river_job.args) : nil, + attempt: river_job.attempt, + attempted_at: river_job.attempted_at, + attempted_by: river_job.attempted_by, + created_at: river_job.created_at, + errors: errors&.map { |e| + deserialized_error = JSON.parse(e, symbolize_names: true) + + River::AttemptError.new( + at: Time.parse(deserialized_error[:at]), + attempt: deserialized_error[:attempt], + error: deserialized_error[:error], + trace: deserialized_error[:trace] + ) + }, + finalized_at: river_job.finalized_at, + kind: river_job.kind, + max_attempts: river_job.max_attempts, + priority: river_job.priority, + queue: river_job.queue, + scheduled_at: river_job.scheduled_at, + state: river_job.state, + tags: river_job.tags + ) + end + end +end diff --git a/drivers/riverqueue-sequel/lib/riverqueue-sequel.rb b/drivers/riverqueue-sequel/lib/riverqueue-sequel.rb new file mode 100644 index 0000000..9c7ba09 --- /dev/null +++ b/drivers/riverqueue-sequel/lib/riverqueue-sequel.rb @@ -0,0 +1,6 @@ +require "sequel" + +require_relative "driver" + +module River +end diff --git a/drivers/riverqueue-sequel/riverqueue-sequel.gemspec b/drivers/riverqueue-sequel/riverqueue-sequel.gemspec new file mode 100644 index 0000000..52c0118 --- /dev/null +++ b/drivers/riverqueue-sequel/riverqueue-sequel.gemspec @@ -0,0 +1,14 @@ +Gem::Specification.new do |s| + s.name = "riverqueue-sequel" + s.version = "0.0.1" + s.summary = "Sequel driver for the River Ruby gem." + s.description = "Sequel driver for the River Ruby gem." + s.authors = ["Blake Gentry", "Brandur Leach"] + s.email = "brandur@brandur.org" + s.files = ["lib/riverqueue-sequel.rb"] + s.homepage = "https://riverqueue.com" + s.license = "LGPL-3.0-or-later" + + s.add_dependency "pg" + s.add_dependency "sequel" +end diff --git a/drivers/riverqueue-sequel/spec/driver_spec.rb b/drivers/riverqueue-sequel/spec/driver_spec.rb new file mode 100644 index 0000000..9c2fde7 --- /dev/null +++ b/drivers/riverqueue-sequel/spec/driver_spec.rb @@ -0,0 +1,202 @@ +require "spec_helper" + +class SimpleArgs + attr_accessor :job_num + + def initialize(job_num:) + self.job_num = job_num + end + + def kind = "simple" + + def to_json = JSON.dump({job_num: job_num}) +end + +# Lets us test job-specific insertion opts by making `#insert_opts` an accessor. +# Real args that make use of this functionality will probably want to make +# `#insert_opts` a non-accessor method instead. +class SimpleArgsWithInsertOpts < SimpleArgs + attr_accessor :insert_opts +end + +RSpec.describe River::Driver::Sequel do + around(:each) { |ex| test_transaction(&ex) } + + let!(:driver) { River::Driver::Sequel.new(DB) } + let(:client) { River::Client.new(driver) } + + describe "#insert" do + it "inserts a job" do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + expect(insert_res.job).to have_attributes( + attempt: 0, + args: {"job_num" => 1}, + created_at: be_within(2).of(Time.now.utc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.utc), + state: River::JOB_STATE_AVAILABLE, + tags: ::Sequel.pg_array([]) + ) + + # Make sure it made it to the database. Assert only minimally since we're + # certain it's the same as what we checked above. + river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) + expect(river_job).to have_attributes( + kind: "simple" + ) + end + + it "schedules a job" do + target_time = Time.now.utc + 1 * 3600 + + insert_res = client.insert( + SimpleArgs.new(job_num: 1), + insert_opts: River::InsertOpts.new(scheduled_at: target_time) + ) + expect(insert_res.job).to have_attributes( + scheduled_at: be_within(2).of(target_time), + state: River::JOB_STATE_SCHEDULED + ) + end + + it "inserts with job insert opts" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + insert_res = client.insert(args) + expect(insert_res.job).to have_attributes( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + end + + it "inserts with insert opts" do + # We set job insert opts in this spec too so that we can verify that the + # options passed at insertion time take precedence. + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + insert_res = client.insert(args, insert_opts: River::InsertOpts.new( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + )) + expect(insert_res.job).to have_attributes( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + ) + end + + it "inserts with job args hash" do + insert_res = client.insert(River::JobArgsHash.new("hash_kind", { + job_num: 1 + })) + expect(insert_res.job).to have_attributes( + args: {"job_num" => 1}, + kind: "hash_kind" + ) + end + + it "inserts in a transaction" do + insert_res = nil + + DB.transaction(savepoint: true) do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + + river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) + expect(river_job).to_not be_nil + + raise Sequel::Rollback + end + + # Not visible because the job was rolled back. + river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) + expect(river_job).to be_nil + end + end + + describe "#to_job_row" do + it "converts a database record to `River::JobRow`" do + now = Time.now.utc + river_job = River::Driver::Sequel::RiverJob.new( + attempt: 1, + attempted_at: now, + attempted_by: ["client1"], + created_at: now, + args: %({"job_num":1}), + finalized_at: now, + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: now, + state: River::JOB_STATE_COMPLETED, + tags: ["tag1"] + ) + river_job.id = 1 + + job_row = driver.send(:to_job_row, river_job) + + expect(job_row).to be_an_instance_of(River::JobRow) + expect(job_row).to have_attributes( + id: 1, + args: {"job_num" => 1}, + attempt: 1, + attempted_at: now, + attempted_by: ["client1"], + created_at: now, + finalized_at: now, + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: now, + state: River::JOB_STATE_COMPLETED, + tags: ["tag1"] + ) + end + + it "with errors" do + now = Time.now.utc + river_job = River::Driver::Sequel::RiverJob.new( + errors: [JSON.dump( + { + at: now, + attempt: 1, + error: "job failure", + trace: "error trace" + } + )] + ) + + job_row = driver.send(:to_job_row, river_job) + + expect(job_row.errors.count).to be(1) + expect(job_row.errors[0]).to be_an_instance_of(River::AttemptError) + expect(job_row.errors[0]).to have_attributes( + at: now.floor(0), + attempt: 1, + error: "job failure", + trace: "error trace" + ) + end + end +end diff --git a/drivers/riverqueue-sequel/spec/spec_helper.rb b/drivers/riverqueue-sequel/spec/spec_helper.rb new file mode 100644 index 0000000..4a1d589 --- /dev/null +++ b/drivers/riverqueue-sequel/spec/spec_helper.rb @@ -0,0 +1,19 @@ +require "sequel" + +DB = Sequel.connect(ENV["TEST_DATABASE_URL"] || "postgres://localhost/riverqueue_ruby_test") + +def test_transaction + DB.transaction do + yield + raise Sequel::Rollback + end +end + +require "simplecov" +SimpleCov.start do + enable_coverage :branch + minimum_coverage line: 100, branch: 100 +end + +require "riverqueue" +require "riverqueue-sequel" diff --git a/lib/client.rb b/lib/client.rb new file mode 100644 index 0000000..58d5bbc --- /dev/null +++ b/lib/client.rb @@ -0,0 +1,110 @@ +module River + MAX_ATTEMPTS_DEFAULT = 25 + PRIORITY_DEFAULT = 1 + QUEUE_DEFAULT = "default" + + # Provides a client for River that inserts jobs. Unlike the Go version of the + # River client, this one can insert jobs only. Jobs can only be worked from Go + # code, so job arg kinds and JSON encoding details must be shared between Ruby + # and Go code. + # + # Used in conjunction with a River driver like: + # + # DB = Sequel.connect(...) + # client = River::Client.new(River::Driver::Sequel.new(DB)) + # + # River drivers are found in separate gems like `riverqueue-sequel` to help + # minimize transient dependencies. + class Client + def initialize(driver) + @driver = driver + end + + # Inserts a new job for work given a job args implementation and insertion + # options (which may be omitted). + # + # Job arg implementations are expected to respond to: + # + # * `#kind`: A string that uniquely identifies the job in the database. + # * `#to_json`: Encodes the args to JSON for persistence in the database. + # Must match encoding an args struct on the Go side to be workable. + # + # They may also respond to `#insert_opts` which is expected to return an + # `InsertOpts` that contains options that will apply to all jobs of this + # kind. Insertion options provided as an argument to `#insert` override + # those returned by job args. + # + # Returns an instance of InsertResult. + def insert(args, insert_opts: InsertOpts.new) + raise "args should respond to `#kind`" if !args.respond_to?(:kind) + + # ~all objects in Ruby respond to `#to_json`, so check non-nil instead. + args_json = args.to_json + raise "args should return non-nil from `#to_json`" if !args_json + + args_insert_opts = if args.respond_to?(:insert_opts) + args_with_insert_opts = args #: _JobArgsWithInsertOpts # rubocop:disable Layout/LeadingCommentSpace + args_with_insert_opts.insert_opts || InsertOpts.new + else + InsertOpts.new + end + + scheduled_at = insert_opts.scheduled_at || args_insert_opts.scheduled_at + + job = @driver.insert(Driver::JobInsertParams.new( + encoded_args: args_json, + kind: args.kind, + max_attempts: insert_opts.max_attempts || args_insert_opts.max_attempts || MAX_ATTEMPTS_DEFAULT, + priority: insert_opts.priority || args_insert_opts.priority || PRIORITY_DEFAULT, + queue: insert_opts.queue || args_insert_opts.queue || QUEUE_DEFAULT, + scheduled_at: scheduled_at&.utc, # database defaults to now + state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE, + tags: insert_opts.tags || args_insert_opts.tags + )) + + InsertResult.new(job) + end + + # Inserts many new jobs as part of a single batch operation for improved + # efficiency. + # + # Takes an array of job args or InsertManyParams which encapsulate job args + # and a paired InsertOpts. + # + # Job arg implementations are expected to respond to: + # + # * `#kind`: A string that uniquely identifies the job in the database. + # * `#to_json`: Encodes the args to JSON for persistence in the database. + # Must match encoding an args struct on the Go side to be workable. + # + # Returns the number of jobs inserted. + def insert_many(args) + raise "sorry, not implemented yet" + end + end + + # A single job to insert that's part of an #insert_many batch insert. Unlike + # sending raw job args, supports an InsertOpts to pair with the job. + class InsertManyParams + # Job args to insert. + attr_reader :args + + # Insertion options to use with the insert. + attr_reader :insert_opts + + def initialize(args, insert_opts: nil) + @args = args + @insert_opts = insert_opts + end + end + + # Result of a single insertion. + class InsertResult + # Inserted job row. + attr_reader :job + + def initialize(job) + @job = job + end + end +end diff --git a/lib/driver.rb b/lib/driver.rb new file mode 100644 index 0000000..97045fd --- /dev/null +++ b/lib/driver.rb @@ -0,0 +1,47 @@ +module River + # Contains an interface used by the top-level River module to interface with + # its driver implementations. All types and methods in this module should be + # considered to be for internal use only and subject to change. API stability + # is not guaranteed. + module Driver + # Insert parameters for a job. This is sent to underlying drivers and is meant + # for internal use only. Its interface is subject to change. + class JobInsertParams + attr_accessor :encoded_args + attr_accessor :kind + attr_accessor :max_attempts + attr_accessor :priority + attr_accessor :queue + attr_accessor :scheduled_at + attr_accessor :state + attr_accessor :tags + + # TODO(brandur): Get these supported. + # attr_accessor :unique + # attr_accessor :unique_by_args + # attr_accessor :unique_by_period + # attr_accessor :unique_by_queue + # attr_accessor :unique_by_state + + def initialize( + encoded_args:, + kind:, + max_attempts:, + priority:, + queue:, + scheduled_at:, + state:, + tags: + ) + self.encoded_args = encoded_args + self.kind = kind + self.max_attempts = max_attempts + self.priority = priority + self.queue = queue + self.scheduled_at = scheduled_at + self.state = state + self.tags = tags + end + end + end +end diff --git a/lib/insert_opts.rb b/lib/insert_opts.rb new file mode 100644 index 0000000..98035c0 --- /dev/null +++ b/lib/insert_opts.rb @@ -0,0 +1,65 @@ +module River + class InsertOpts + # MaxAttempts is the maximum number of total attempts (including both the + # original run and all retries) before a job is abandoned and set as + # discarded. + attr_accessor :max_attempts + + # Priority is the priority of the job, with 1 being the highest priority and + # 4 being the lowest. When fetching available jobs to work, the highest + # priority jobs will always be fetched before any lower priority jobs are + # fetched. Note that if your workers are swamped with more high-priority jobs + # then they can handle, lower priority jobs may not be fetched. + # + # Defaults to PRIORITY_DEFAULT. + attr_accessor :priority + + # Queue is the name of the job queue in which to insert the job. + # + # Defaults to QUEUE_DEFAULT. + attr_accessor :queue + + # ScheduledAt is a time in future at which to schedule the job (i.e. in + # cases where it shouldn't be run immediately). The job is guaranteed not + # to run before this time, but may run slightly after depending on the + # number of other scheduled jobs and how busy the queue is. + # + # Use of this option generally only makes sense when passing options into + # Insert rather than when a job args is returning `#insert_opts`, however, + # it will work in both cases. + attr_accessor :scheduled_at + + # Tags are an arbitrary list of keywords to add to the job. They have no + # functional behavior and are meant entirely as a user-specified construct + # to help group and categorize jobs. + # + # If tags are specified from both a job args override and from options on + # Insert, the latter takes precedence. Tags are not merged. + attr_accessor :tags + + # UniqueOpts returns options relating to job uniqueness. An empty struct + # avoids setting any worker-level unique options. + # + # TODO: Implement. + attr_accessor :unique_opts + + def initialize( + max_attempts: nil, + priority: nil, + queue: nil, + scheduled_at: nil, + tags: nil, + unique_opts: nil + ) + self.max_attempts = max_attempts + self.priority = priority + self.queue = queue + self.scheduled_at = scheduled_at + self.tags = tags + self.unique_opts = unique_opts + end + end + + class UniqueOpts + end +end diff --git a/lib/job.rb b/lib/job.rb new file mode 100644 index 0000000..58e32c9 --- /dev/null +++ b/lib/job.rb @@ -0,0 +1,169 @@ +module River + JOB_STATE_AVAILABLE = "available" + JOB_STATE_CANCELLED = "cancelled" + JOB_STATE_COMPLETED = "completed" + JOB_STATE_DISCARDED = "discarded" + JOB_STATE_RETRYABLE = "retryable" + JOB_STATE_RUNNING = "running" + JOB_STATE_SCHEDULED = "scheduled" + + # Provides a way of creating a job args from a simple Ruby hash for a quick + # way to insert a job without having to define a class. The first argument is + # a "kind" string for identifying the job in the database and the second is a + # hash that will be encoded to JSON. + class JobArgsHash + def initialize(kind, hash) + raise "kind should be non-nil" if !kind + raise "hash should be non-nil" if !hash + + @kind = kind + @hash = hash + end + + attr_reader :kind + + def to_json + JSON.dump(@hash) + end + end + + # JobRow contains the properties of a job that are persisted to the database. + class JobRow + # ID of the job. Generated as part of a Postgres sequence and generally + # ascending in nature, but there may be gaps in it as transactions roll + # back. + attr_accessor :id + + # The job's args as a hash decoded from JSON. + attr_accessor :args + + # The attempt number of the job. Jobs are inserted at 0, the number is + # incremented to 1 the first time work its worked, and may increment further + # if it's either snoozed or errors. + attr_accessor :attempt + + # The time that the job was last worked. Starts out as `nil` on a new + # insert. + attr_accessor :attempted_at + + # The set of worker IDs that have worked this job. A worker ID differs + # between different programs, but is shared by all executors within any + # given one. (i.e. Different Go processes have different IDs, but IDs are + # shared within any given process.) A process generates a new ULID (an + # ordered UUID) worker ID when it starts up. + attr_accessor :attempted_by + + # When the job record was created. + attr_accessor :created_at + + # A set of errors that occurred when the job was worked, one for each + # attempt. Ordered from earliest error to the latest error. + attr_accessor :errors + + # The time at which the job was "finalized", meaning it was either completed + # successfully or errored for the last time such that it'll no longer be + # retried. + attr_accessor :finalized_at + + # Kind uniquely identifies the type of job and instructs which worker + # should work it. It is set at insertion time via `#kind` on job args. + attr_accessor :kind + + # The maximum number of attempts that the job will be tried before it errors + # for the last time and will no longer be worked. + attr_accessor :max_attempts + + # The priority of the job, with 1 being the highest priority and 4 being the + # lowest. When fetching available jobs to work, the highest priority jobs + # will always be fetched before any lower priority jobs are fetched. Note + # that if your workers are swamped with more high-priority jobs then they + # can handle, lower priority jobs may not be fetched. + attr_accessor :priority + + # The name of the queue where the job will be worked. Queues can be + # configured independently and be used to isolate jobs. + attr_accessor :queue + + # When the job is scheduled to become available to be worked. Jobs default + # to running immediately, but may be scheduled for the future when they're + # inserted. They may also be scheduled for later because they were snoozed + # or because they errored and have additional retry attempts remaining. + attr_accessor :scheduled_at + + # The state of job like `available` or `completed`. Jobs are `available` + # when they're first inserted. + attr_accessor :state + + # Tags are an arbitrary list of keywords to add to the job. They have no + # functional behavior and are meant entirely as a user-specified construct + # to help group and categorize jobs. + attr_accessor :tags + + def initialize( + id:, + args:, + attempt:, + created_at:, + kind:, + max_attempts:, + priority:, + queue:, + scheduled_at:, + state:, + + # nullable/optional + attempted_at: nil, + attempted_by: nil, + errors: nil, + finalized_at: nil, + tags: nil + ) + self.id = id + self.args = args + self.attempt = attempt + self.attempted_at = attempted_at + self.attempted_by = attempted_by + self.created_at = created_at + self.errors = errors + self.finalized_at = finalized_at + self.kind = kind + self.max_attempts = max_attempts + self.priority = priority + self.queue = queue + self.scheduled_at = scheduled_at + self.state = state + self.tags = tags + end + end + + # A failed job work attempt containing information about the error or panic + # that occurred. + class AttemptError + # The time at which the error occurred. + attr_accessor :at + + # The attempt number on which the error occurred (maps to #attempt on a job + # row). + attr_accessor :attempt + + # Contains the stringified error of an error returned from a job or a panic + # value in case of a panic. + attr_accessor :error + + # Contains a stack trace from a job that panicked. The trace is produced by + # invoking `debug.Trace()`. + attr_accessor :trace + + def initialize( + at:, + attempt:, + error:, + trace: + ) + self.at = at + self.attempt = attempt + self.error = error + self.trace = trace + end + end +end diff --git a/lib/riverqueue.rb b/lib/riverqueue.rb index e69de29..e77b938 100644 --- a/lib/riverqueue.rb +++ b/lib/riverqueue.rb @@ -0,0 +1,9 @@ +require "json" + +require_relative "client" +require_relative "driver" +require_relative "insert_opts" +require_relative "job" + +module River +end diff --git a/riverqueue.gemspec b/riverqueue.gemspec index 1da7766..c7e2ba3 100644 --- a/riverqueue.gemspec +++ b/riverqueue.gemspec @@ -1,11 +1,11 @@ Gem::Specification.new do |s| - s.name = "riverqueue" - s.version = "0.0.1" - s.summary = "A fast job queue for Go" + s.name = "riverqueue" + s.version = "0.0.1" + s.summary = "A fast job queue for Go." s.description = "A fast job queue for Go." - s.authors = ["Blake Gentry", "Brandur Leach"] - s.email = "brandur@brandur.org" - s.files = ["lib/riverqueue.rb"] - s.homepage = "https://riverqueue.com" - s.license = "LGPL-3.0-or-later" + s.authors = ["Blake Gentry", "Brandur Leach"] + s.email = "brandur@brandur.org" + s.files = ["lib/riverqueue.rb"] + s.homepage = "https://riverqueue.com" + s.license = "LGPL-3.0-or-later" end diff --git a/sig/client.rbs b/sig/client.rbs new file mode 100644 index 0000000..b45a73c --- /dev/null +++ b/sig/client.rbs @@ -0,0 +1,31 @@ +module River + MAX_ATTEMPTS_DEFAULT: Integer + PRIORITY_DEFAULT: Integer + QUEUE_DEFAULT: String + + class Client + @driver: _Driver + + def initialize: (_Driver driver) -> void + def insert: (jobArgs, ?insert_opts: InsertOpts) -> InsertResult + def insert_many: (Array[jobArgs | InsertManyParams]) -> Integer + end + + class InsertManyParams + @args: jobArgs + @insert_opts: InsertOpts? + + attr_reader args: jobArgs + attr_reader insert_opts: InsertOpts? + + def initialize: (jobArgs job, ?insert_opts: InsertOpts?) -> void + end + + class InsertResult + @job: JobRow + + attr_reader job: JobRow + + def initialize: (JobRow job) -> void + end +end diff --git a/sig/driver.rbs b/sig/driver.rbs new file mode 100644 index 0000000..2d4c9b8 --- /dev/null +++ b/sig/driver.rbs @@ -0,0 +1,27 @@ +module River + interface _Driver + def insert: (Driver::JobInsertParams) -> JobRow + end + + module Driver + class JobInsertParams + attr_accessor encoded_args: String + attr_accessor kind: String + attr_accessor max_attempts: Integer + attr_accessor priority: Integer + attr_accessor queue: String + attr_accessor scheduled_at: Time? + attr_accessor state: jobStateAll + attr_accessor tags: Array[String]? + + # TODO(brandur): Get these supported. + # attr_accessor :unique + # attr_accessor :unique_by_args + # attr_accessor :unique_by_period + # attr_accessor :unique_by_queue + # attr_accessor :unique_by_state + + def initialize: (encoded_args: String, kind: String, max_attempts: Integer, priority: Integer, queue: String, scheduled_at: Time?, state: jobStateAll, tags: Array[String]?) -> void + end + end +end diff --git a/sig/insert_opts.rbs b/sig/insert_opts.rbs new file mode 100644 index 0000000..387905a --- /dev/null +++ b/sig/insert_opts.rbs @@ -0,0 +1,21 @@ +module River + class InsertOpts + attr_accessor max_attempts: Integer? + + attr_accessor priority: Integer? + + attr_accessor queue: String? + + attr_accessor scheduled_at: Time? + + attr_accessor tags: Array[String]? + + attr_accessor unique_opts: UniqueOpts? + + def initialize: (?max_attempts: Integer?, ?priority: Integer?, ?queue: String?, ?scheduled_at: Time?, ?tags: Array[String]?, ?unique_opts: UniqueOpts?) -> void + end + + class UniqueOpts + # TODO + end +end diff --git a/sig/job.rbs b/sig/job.rbs new file mode 100644 index 0000000..78bda83 --- /dev/null +++ b/sig/job.rbs @@ -0,0 +1,64 @@ +module River + JOB_STATE_AVAILABLE: "available" + JOB_STATE_CANCELLED: "cancelled" + JOB_STATE_COMPLETED: "completed" + JOB_STATE_DISCARDED: "discarded" + JOB_STATE_RETRYABLE: "retryable" + JOB_STATE_RUNNING: "running" + JOB_STATE_SCHEDULED: "scheduled" + + type jobStateAll = "available" | "cancelled" | "completed" | "discarded" | "retryable" | "running" | "scheduled" + + interface _JobArgs + def kind: () -> String + def respond_to?: (Symbol) -> bool + def to_json: () -> String + end + + interface _JobArgsWithInsertOpts + include _JobArgs + + def insert_opts: () -> InsertOpts? + end + + type jobArgs = _JobArgs | _JobArgsWithInsertOpts + + class JobArgsHash + @kind: String + @hash: Hash[String | Symbol, untyped] + + attr_reader kind: String + + def initialize: (String kind, Hash[String | Symbol, untyped] hash) -> void + def to_json: () -> String + end + + class JobRow + attr_accessor id: Integer + attr_accessor args: Hash[String, untyped] + attr_accessor attempt: Integer + attr_accessor attempted_at: Time? + attr_accessor attempted_by: String? + attr_accessor created_at: Time + attr_accessor errors: Array[AttemptError]? + attr_accessor finalized_at: Time? + attr_accessor kind: String + attr_accessor max_attempts: Integer + attr_accessor priority: Integer + attr_accessor queue: String + attr_accessor scheduled_at: Time + attr_accessor state: jobStateAll + attr_accessor tags: Array[String]? + + def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?) -> void + end + + class AttemptError + attr_accessor at: Time + attr_accessor attempt: Integer + attr_accessor error: String + attr_accessor trace: String + + def initialize: (at: Time, attempt: Integer, error: String, trace: String) -> void + end +end diff --git a/sig/riverqueue.rbs b/sig/riverqueue.rbs new file mode 100644 index 0000000..6ad50d4 --- /dev/null +++ b/sig/riverqueue.rbs @@ -0,0 +1,2 @@ +module River +end diff --git a/spec/client_spec.rb b/spec/client_spec.rb new file mode 100644 index 0000000..88a7437 --- /dev/null +++ b/spec/client_spec.rb @@ -0,0 +1,173 @@ +require "spec_helper" + +# We use a mock here, but each driver has a more comprehensive test suite that +# performs full integration level tests. +class MockDriver + def initialize + @insert_params = [] + @next_id = 0 + end + + def insert(insert_params) + @insert_params << insert_params + + River::JobRow.new( + id: (@next_id += 1), + args: JSON.parse(insert_params.encoded_args), + attempt: 0, + attempted_by: nil, + created_at: Time.now, + errors: nil, + finalized_at: nil, + kind: insert_params.kind, + max_attempts: insert_params.max_attempts, + priority: insert_params.priority, + queue: insert_params.queue, + scheduled_at: insert_params.scheduled_at || Time.now, # normally defaults from DB + state: insert_params.state, + tags: insert_params.tags + ) + end +end + +class SimpleArgs + attr_accessor :job_num + + def initialize(job_num:) + self.job_num = job_num + end + + def kind = "simple" + + def to_json = JSON.dump({job_num: job_num}) +end + +# Lets us test job-specific insertion opts by making `#insert_opts` an accessor. +# Real args that make use of this functionality will probably want to make +# `#insert_opts` a non-accessor method instead. +class SimpleArgsWithInsertOpts < SimpleArgs + attr_accessor :insert_opts +end + +RSpec.describe River::Client do + let(:client) { River::Client.new(mock_driver) } + let(:mock_driver) { MockDriver.new } + + describe "#insert" do + it "inserts a job with defaults" do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + expect(insert_res.job).to have_attributes( + id: 1, + args: {"job_num" => 1}, + attempt: 0, + created_at: be_within(2).of(Time.now), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: be_within(2).of(Time.now), + state: River::JOB_STATE_AVAILABLE, + tags: nil + ) + end + + it "schedules a job" do + target_time = Time.now + 1 * 3600 + + insert_res = client.insert( + SimpleArgs.new(job_num: 1), + insert_opts: River::InsertOpts.new(scheduled_at: target_time) + ) + expect(insert_res.job).to have_attributes( + scheduled_at: be_within(2).of(target_time), + state: River::JOB_STATE_SCHEDULED + ) + + # Expect all inserted timestamps to go to UTC. + expect(insert_res.job.scheduled_at.utc?).to be true + end + + it "inserts with job insert opts" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + insert_res = client.insert(args) + expect(insert_res.job).to have_attributes( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + end + + it "inserts with insert opts" do + # We set job insert opts in this spec too so that we can verify that the + # options passed at insertion time take precedence. + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + insert_res = client.insert(args, insert_opts: River::InsertOpts.new( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + )) + expect(insert_res.job).to have_attributes( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + ) + end + + it "inserts with job args hash" do + insert_res = client.insert(River::JobArgsHash.new("hash_kind", { + job_num: 1 + })) + expect(insert_res.job).to have_attributes( + args: {"job_num" => 1}, + kind: "hash_kind" + ) + end + + it "errors if args don't respond to #kind" do + args_klass = Class.new do + def to_json = {} + end + + expect do + client.insert(args_klass.new) + end.to raise_error(RuntimeError, "args should respond to `#kind`") + end + + it "errors if args return nil from #to_json" do + args_klass = Class.new do + def kind = "args_kind" + + def to_json = nil + end + + expect do + client.insert(args_klass.new) + end.to raise_error(RuntimeError, "args should return non-nil from `#to_json`") + end + end + + describe "#insert_many" do + it "inserts many jobs" do + expect do + client.insert_many([]) + end.to raise_error(RuntimeError, "sorry, not implemented yet") + end + end +end diff --git a/spec/job_spec.rb b/spec/job_spec.rb new file mode 100644 index 0000000..d34b40a --- /dev/null +++ b/spec/job_spec.rb @@ -0,0 +1,40 @@ +require "spec_helper" + +describe River::JobArgsHash do + it "generates a job args based on a hash" do + args = River::JobArgsHash.new("my_hash_kind", {job_num: 123}) + expect(args.kind).to eq("my_hash_kind") + expect(args.to_json).to eq(JSON.dump({job_num: 123})) + end + + it "errors on a nil kind" do + expect do + River::JobArgsHash.new(nil, {job_num: 123}) + end.to raise_error(RuntimeError, "kind should be non-nil") + end + + it "errors on a nil hash" do + expect do + River::JobArgsHash.new("my_hash_kind", nil) + end.to raise_error(RuntimeError, "hash should be non-nil") + end +end + +describe River::AttemptError do + it "initializes with parameters" do + now = Time.now + + attempt_error = River::AttemptError.new( + at: now, + attempt: 1, + error: "job failure", + trace: "error trace" + ) + expect(attempt_error).to have_attributes( + at: now, + attempt: 1, + error: "job failure", + trace: "error trace" + ) + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..a906f38 --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,12 @@ +require "debug" + +# Only show coverage information if running the entire suite. +if RSpec.configuration.files_to_run.length > 1 + require "simplecov" + SimpleCov.start do + enable_coverage :branch + minimum_coverage line: 100, branch: 100 + end +end + +require "riverqueue"