From 78b1e9a3328a462b20d0bc347b4ceea100faee6c Mon Sep 17 00:00:00 2001 From: Brandur Date: Sun, 19 Nov 2023 16:20:15 -0800 Subject: [PATCH] First release of Sequel driver for River Ruby bindings Related to main `riverqueue` gem's push in [1], this one provides a driver implementation for the Sequel gem. This is a similar concept to use the use of `riverpgxv5` in the main Go package -- it breaks up implementations for specific database packages into separate gems so that projects using River don't have include every third party database framework under the sun. I'll also be writing one for ActiveRecord. Like with [1], functionality for unique jobs and batch inserts is currently missing, to be added on a follow up release. [1] https://github.com/riverqueue/riverqueue-ruby/pull/1 --- .github/workflows/ci.yml | 80 +++++++++++++++ .gitignore | 1 + Gemfile | 17 +++- Gemfile.lock | 75 ++++++++++++++ README.md | 8 -- docs/README.md | 23 +++++ docs/development.md | 48 +++++++++ lib/driver.rb | 73 ++++++++++++++ lib/riverqueue-sequel.rb | 12 +-- riverqueue-sequel.gemspec | 19 ++-- spec/driver_spec.rb | 202 ++++++++++++++++++++++++++++++++++++++ spec/spec_helper.rb | 19 ++++ 12 files changed, 552 insertions(+), 25 deletions(-) create mode 100644 .github/workflows/ci.yml delete mode 100644 README.md create mode 100644 docs/README.md create mode 100644 docs/development.md create mode 100644 lib/driver.rb create mode 100644 spec/driver_spec.rb create mode 100644 spec/spec_helper.rb diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..2f05017 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,80 @@ +name: CI + +env: + # Database to connect to that can create other databases with `CREATE DATABASE`. + ADMIN_DATABASE_URL: postgres://postgres:postgres@localhost:5432 + + # Just a common place for steps to put binaries they need and which is added + # to GITHUB_PATH/PATH. + BIN_PATH: /home/runner/bin + + # A suitable URL for a test database. + TEST_DATABASE_URL: postgres://postgres:postgres@127.0.0.1:5432/riverqueue_ruby_test?sslmode=disable + +on: + - push + +jobs: + lint: + runs-on: ubuntu-latest + timeout-minutes: 3 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Ruby + `bundle install` + uses: ruby/setup-ruby@v1 + with: + ruby-version: "head" + bundler-cache: true # runs 'bundle install' and caches installed gems automatically + + - name: Standard Ruby + run: bundle exec standardrb + + spec: + runs-on: ubuntu-latest + timeout-minutes: 3 + + services: + postgres: + image: postgres + env: + POSTGRES_PASSWORD: postgres + options: >- + --health-cmd pg_isready + --health-interval 2s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Ruby + `bundle install` + uses: ruby/setup-ruby@v1 + with: + ruby-version: "head" + bundler-cache: true # runs 'bundle install' and caches installed gems automatically + + # There is a version of Go on Actions' base image, but it's old and can't + # read modern `go.mod` annotations correctly. + - name: Install Go + uses: actions/setup-go@v4 + with: + go-version: "stable" + check-latest: true + + - name: Create database + run: psql --echo-errors --quiet -c '\timing off' -c "CREATE DATABASE riverqueue_ruby_test;" ${ADMIN_DATABASE_URL} + + - name: Install River CLI + run: go install github.com/riverqueue/river/cmd/river@latest + + - name: river migrate-up + run: river migrate-up --database-url "$TEST_DATABASE_URL" + + - name: Rspec + run: bundle exec rspec diff --git a/.gitignore b/.gitignore index 8966b04..e6df5c3 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /*.gem +/coverage/ diff --git a/Gemfile b/Gemfile index d926697..7c051b8 100644 --- a/Gemfile +++ b/Gemfile @@ -1,2 +1,15 @@ -source 'https://rubygems.org' -gemspec \ No newline at end of file +source "https://rubygems.org" + +gemspec + +group :development, :test do + gem "riverqueue", git: "https://github.com/riverqueue/riverqueue-ruby", branch: "brandur-first-release" + # gem "riverqueue", path: "../riverqueue-ruby" + gem "standard" +end + +group :test do + gem "rspec-core" + gem "rspec-expectations" + gem "simplecov", require: false +end diff --git a/Gemfile.lock b/Gemfile.lock index e74aba7..f861328 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,17 +1,92 @@ +GIT + remote: https://github.com/riverqueue/riverqueue-ruby + revision: 8d029045f4fee36289ce1da2e7a067fd851c84d7 + branch: brandur-first-release + specs: + riverqueue (0.0.1) + PATH remote: . specs: riverqueue-sequel (0.0.1) + pg + sequel GEM remote: https://rubygems.org/ specs: + ast (2.4.2) + bigdecimal (3.1.4) + diff-lcs (1.5.0) + docile (1.4.0) + json (2.7.1) + language_server-protocol (3.17.0.3) + lint_roller (1.1.0) + parallel (1.24.0) + parser (3.3.0.5) + ast (~> 2.4.1) + racc + pg (1.5.4) + racc (1.7.3) + rainbow (3.1.1) + regexp_parser (2.9.0) + rexml (3.2.6) + rspec-core (3.12.2) + rspec-support (~> 3.12.0) + rspec-expectations (3.12.3) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.12.0) + rspec-support (3.12.1) + rubocop (1.61.0) + json (~> 2.3) + language_server-protocol (>= 3.17.0) + parallel (~> 1.10) + parser (>= 3.3.0.2) + rainbow (>= 2.2.2, < 4.0) + regexp_parser (>= 1.8, < 3.0) + rexml (>= 3.2.5, < 4.0) + rubocop-ast (>= 1.30.0, < 2.0) + ruby-progressbar (~> 1.7) + unicode-display_width (>= 2.4.0, < 3.0) + rubocop-ast (1.31.1) + parser (>= 3.3.0.4) + rubocop-performance (1.20.2) + rubocop (>= 1.48.1, < 2.0) + rubocop-ast (>= 1.30.0, < 2.0) + ruby-progressbar (1.13.0) + sequel (5.74.0) + bigdecimal + simplecov (0.22.0) + docile (~> 1.1) + simplecov-html (~> 0.11) + simplecov_json_formatter (~> 0.1) + simplecov-html (0.12.3) + simplecov_json_formatter (0.1.4) + standard (1.34.0) + language_server-protocol (~> 3.17.0.2) + lint_roller (~> 1.0) + rubocop (~> 1.60) + standard-custom (~> 1.0.0) + standard-performance (~> 1.3) + standard-custom (1.0.2) + lint_roller (~> 1.0) + rubocop (~> 1.50) + standard-performance (1.3.1) + lint_roller (~> 1.1) + rubocop-performance (~> 1.20.2) + unicode-display_width (2.5.0) PLATFORMS arm64-darwin-22 + x86_64-linux DEPENDENCIES + riverqueue! riverqueue-sequel! + rspec-core + rspec-expectations + simplecov + standard BUNDLED WITH 2.4.20 diff --git a/README.md b/README.md deleted file mode 100644 index 515d42b..0000000 --- a/README.md +++ /dev/null @@ -1,8 +0,0 @@ -# River Ruby bindings Sequel driver - -A future home for River's Ruby bindings. For now, the [Gem is registered](https://rubygems.org/gems/riverqueue), but nothing else is done. - -``` sh -$ gem build riverqueue-sequel.gemspec -$ gem push riverqueue-sequel-0.0.1.gem -``` diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..e3f6601 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,23 @@ +# riverqueue-sequel [![Build Status](https://github.com/riverqueue/riverqueue-ruby-sequel/workflows/CI/badge.svg)](https://github.com/riverqueue/riverqueue-ruby-sequel/actions) + +[Sequel](https://github.com/jeremyevans/sequel) driver for [River](https://github.com/riverqueue/river)'s [`riverqueue` gem for Ruby](https://rubygems.org/gems/riverqueue). + +`Gemfile` should contain the core gem and a driver like this one: + +``` yaml +gem "riverqueue" +gem "riverqueue-sequel" +``` + +Initialize a client with: + +```ruby +DB = Sequel.connect("postgres://...") +client = River::Client.new(River::Driver::Sequel.new(DB)) +``` + +See also [`rubyqueue`](https://github.com/riverqueue/riverqueue-ruby). + +## Development + +See [development](./development.md). diff --git a/docs/development.md b/docs/development.md new file mode 100644 index 0000000..2f85549 --- /dev/null +++ b/docs/development.md @@ -0,0 +1,48 @@ +# riverqueue-ruby development + +## Install dependencies + +```shell +$ bundle install +``` +## Run tests + +Create a test database and migrate with River's CLI: + +```shell +$ go install github.com/riverqueue/river/cmd/river +$ createdb riverqueue_ruby_test +$ river migrate-up --database-url "postgres://localhost/riverqueue_ruby_test" +``` + +Run all specs: + +```shell +$ bundle exec rspec spec +``` + +## Run lint + +```shell +$ standardrb --fix +``` + +## Code coverage + +Running the entire test suite will produce a coverage report, and will fail if line and branch coverage is below 100%. Run the suite and open `coverage/index.html` to find lines or branches that weren't covered: + +```shell +$ bundle exec rspec spec +$ open coverage/index.html +``` + +## Publish a new gem + +```shell +git checkout master && git pull --rebase +VERSION=v0.0.x +gem build riverqueue-sequel.gemspec +gem push riverqueue-sequel-$VERSION.gem +git tag $VERSION +git push --tags +``` \ No newline at end of file diff --git a/lib/driver.rb b/lib/driver.rb new file mode 100644 index 0000000..b998cc4 --- /dev/null +++ b/lib/driver.rb @@ -0,0 +1,73 @@ +module River::Driver + # Provides a Sequel driver for River. + # + # Used in conjunction with a River client like: + # + # DB = Sequel.connect("postgres://...") + # client = River::Client.new(River::Driver::Sequel.new(DB)) + # + class Sequel + def initialize(db) + @db = db + + # It's Ruby, so we can only define a model after Sequel's established a + # connection because it's all dynamic. + if !River::Driver::Sequel.const_defined?(:RiverJob) + River::Driver::Sequel.const_set(:RiverJob, Class.new(::Sequel::Model(:river_job))) + + # Since we only define our model once, take advantage of knowing this is + # our first initialization to add required extensions. + db.extension(:pg_array) + end + end + + def insert(insert_params) + # the call to `#compact` is important so that we remove nils and table + # default values get picked up instead + to_job_row( + RiverJob.create( + { + args: insert_params.encoded_args, + kind: insert_params.kind, + max_attempts: insert_params.max_attempts, + priority: insert_params.priority, + queue: insert_params.queue, + state: insert_params.state, + scheduled_at: insert_params.scheduled_at, + tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags) : nil + }.compact + ) + ) + end + + private def to_job_row(river_job) + # needs to be accessed through values because Sequel shadows `errors` + errors = river_job.values[:errors] + + River::JobRow.new( + id: river_job.id, + attempt: river_job.attempt, + attempted_at: river_job.attempted_at, + attempted_by: river_job.attempted_by, + created_at: river_job.created_at, + encoded_args: river_job.args, + errors: errors ? JSON.parse(errors, symbolize_names: true).map { |e| + River::AttemptError.new( + at: Time.parse(e[:at]), + attempt: e[:attempt], + error: e[:error], + trace: e[:trace] + ) + } : nil, + finalized_at: river_job.finalized_at, + kind: river_job.kind, + max_attempts: river_job.max_attempts, + priority: river_job.priority, + queue: river_job.queue, + scheduled_at: river_job.scheduled_at, + state: river_job.state, + tags: river_job.tags + ) + end + end +end diff --git a/lib/riverqueue-sequel.rb b/lib/riverqueue-sequel.rb index 34da925..9c7ba09 100644 --- a/lib/riverqueue-sequel.rb +++ b/lib/riverqueue-sequel.rb @@ -1,8 +1,6 @@ +require "sequel" + +require_relative "driver" + module River - module Driver - module Sequel - def initialize - end - end - end -end \ No newline at end of file +end diff --git a/riverqueue-sequel.gemspec b/riverqueue-sequel.gemspec index 0877bb7..52c0118 100644 --- a/riverqueue-sequel.gemspec +++ b/riverqueue-sequel.gemspec @@ -1,11 +1,14 @@ Gem::Specification.new do |s| - s.name = "riverqueue-sequel" - s.version = "0.0.1" - s.summary = "Sequel driver for the River Ruby gem." + s.name = "riverqueue-sequel" + s.version = "0.0.1" + s.summary = "Sequel driver for the River Ruby gem." s.description = "Sequel driver for the River Ruby gem." - s.authors = ["Blake Gentry", "Brandur Leach"] - s.email = "brandur@brandur.org" - s.files = ["lib/riverqueue-sequel.rb"] - s.homepage = "https://riverqueue.com" - s.license = "LGPL-3.0-or-later" + s.authors = ["Blake Gentry", "Brandur Leach"] + s.email = "brandur@brandur.org" + s.files = ["lib/riverqueue-sequel.rb"] + s.homepage = "https://riverqueue.com" + s.license = "LGPL-3.0-or-later" + + s.add_dependency "pg" + s.add_dependency "sequel" end diff --git a/spec/driver_spec.rb b/spec/driver_spec.rb new file mode 100644 index 0000000..c7f0927 --- /dev/null +++ b/spec/driver_spec.rb @@ -0,0 +1,202 @@ +require "spec_helper" + +class SimpleArgs + attr_accessor :job_num + + def initialize(job_num:) + self.job_num = job_num + end + + def kind = "simple" + + def to_json = JSON.dump({job_num: job_num}) +end + +# Lets us test job-specific insertion opts by making `#insert_opts` an accessor. +# Real args that make use of this functionality will probably want to make +# `#insert_opts` a non-accessor method instead. +class SimpleArgsWithInsertOpts < SimpleArgs + attr_accessor :insert_opts +end + +RSpec.describe River::Driver::Sequel do + around(:each) { |ex| test_transaction(&ex) } + + let!(:driver) { River::Driver::Sequel.new(DB) } + let(:client) { River::Client.new(driver) } + + describe "#insert" do + it "inserts a job" do + job = client.insert(SimpleArgs.new(job_num: 1)) + expect(job).to have_attributes( + attempt: 0, + created_at: be_within(2).of(Time.now), + encoded_args: %({"job_num": 1}), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now), + state: River::JOB_STATE_AVAILABLE, + tags: ::Sequel.pg_array([]) + ) + + # Make sure it made it to the database. Assert only minimally since we're + # certain it's the same as what we checked above. + river_job = River::Driver::Sequel::RiverJob.first(id: job.id) + expect(river_job).to have_attributes( + kind: "simple" + ) + end + + it "schedules a job" do + target_time = Time.now.utc + 1 * 3600 + + job = client.insert( + SimpleArgs.new(job_num: 1), + insert_opts: River::InsertOpts.new(scheduled_at: target_time) + ) + expect(job).to have_attributes( + scheduled_at: be_within(2).of(target_time), + state: River::JOB_STATE_SCHEDULED + ) + end + + it "inserts with job insert opts" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + job = client.insert(args) + expect(job).to have_attributes( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + end + + it "inserts with insert opts" do + # We set job insert opts in this spec too so that we can verify that the + # options passed at insertion time take precedence. + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + job = client.insert(args, insert_opts: River::InsertOpts.new( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + )) + expect(job).to have_attributes( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + ) + end + + it "inserts with job args hash" do + job = client.insert(River::JobArgsHash.new("hash_kind", { + job_num: 1 + })) + expect(job).to have_attributes( + encoded_args: %({"job_num": 1}), + kind: "hash_kind" + ) + end + + it "inserts in a transaction" do + job = nil + + DB.transaction(savepoint: true) do + job = client.insert(SimpleArgs.new(job_num: 1)) + + river_job = River::Driver::Sequel::RiverJob.first(id: job.id) + expect(river_job).to_not be_nil + + raise Sequel::Rollback + end + + # Not visible because the job was rolled back. + river_job = River::Driver::Sequel::RiverJob.first(id: job.id) + expect(river_job).to be_nil + end + end + + describe "#to_job_row" do + it "converts a database record to `River::JobRow`" do + now = Time.now.utc + river_job = River::Driver::Sequel::RiverJob.new( + attempt: 1, + attempted_at: now, + attempted_by: "client1", + created_at: now, + args: %({"job_num":1}), + finalized_at: now, + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: now, + state: River::JOB_STATE_COMPLETED, + tags: ["tag1"] + ) + river_job.id = 1 + + job_row = driver.send(:to_job_row, river_job) + + expect(job_row).to be_an_instance_of(River::JobRow) + expect(job_row).to have_attributes( + id: 1, + attempt: 1, + attempted_at: now, + attempted_by: "client1", + created_at: now, + encoded_args: %({"job_num":1}), + finalized_at: now, + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: now, + state: River::JOB_STATE_COMPLETED, + tags: ["tag1"] + ) + end + + it "with errors" do + now = Time.now.utc + river_job = River::Driver::Sequel::RiverJob.new( + errors: JSON.dump([ + { + at: now, + attempt: 1, + error: "job failure", + trace: "error trace" + } + ]) + ) + + job_row = driver.send(:to_job_row, river_job) + + expect(job_row.errors.count).to be(1) + expect(job_row.errors[0]).to be_an_instance_of(River::AttemptError) + expect(job_row.errors[0]).to have_attributes( + at: now.floor(0), + attempt: 1, + error: "job failure", + trace: "error trace" + ) + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..4a1d589 --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,19 @@ +require "sequel" + +DB = Sequel.connect(ENV["TEST_DATABASE_URL"] || "postgres://localhost/riverqueue_ruby_test") + +def test_transaction + DB.transaction do + yield + raise Sequel::Rollback + end +end + +require "simplecov" +SimpleCov.start do + enable_coverage :branch + minimum_coverage line: 100, branch: 100 +end + +require "riverqueue" +require "riverqueue-sequel"