From 03b4d65d27b5ed082a460b8e6b841bfd14908158 Mon Sep 17 00:00:00 2001 From: Brandur Date: Sun, 19 Nov 2023 16:12:08 -0800 Subject: [PATCH] First release of River Ruby bindings A first push of Ruby bindings for River. Meant to be used in conjunction with a driver like `riverqueue-sequel` [1] to provide an insert-only client for River. See the README for details on usage. Overall, I'm happy at how close I was able to keep the API to the Go version. A lot of syntax in Go just isn't needed due to the more dynamic and implicit nature of Ruby, but the parts that came through are quite close. e.g. We have a job args concept, along with `InsertOpts` that can be added to both jobs and at insert time, just like Go. Purposely not implemented on this first push (I'll follow up with these later on): * Unique jobs. * Batch insert. [1] https://github.com/riverqueue/riverqueue-ruby-sequel --- .github/workflows/ci.yml | 80 ++++++++++++++++++++ Gemfile | 12 +++ Gemfile.lock | 70 ++++++++++++++++++ README.md | 8 -- docs/README.md | 21 ++++++ docs/development.md | 39 ++++++++++ lib/client.rb | 56 ++++++++++++++ lib/insert_opts.rb | 62 ++++++++++++++++ lib/internal.rb | 44 +++++++++++ lib/job.rb | 155 +++++++++++++++++++++++++++++++++++++++ lib/riverqueue.rb | 9 +++ riverqueue.gemspec | 16 ++-- spec/client_spec.rb | 6 ++ spec/spec_helper.rb | 1 + 14 files changed, 563 insertions(+), 16 deletions(-) create mode 100644 .github/workflows/ci.yml create mode 100644 Gemfile create mode 100644 Gemfile.lock delete mode 100644 README.md create mode 100644 docs/README.md create mode 100644 docs/development.md create mode 100644 lib/client.rb create mode 100644 lib/insert_opts.rb create mode 100644 lib/internal.rb create mode 100644 lib/job.rb create mode 100644 spec/client_spec.rb create mode 100644 spec/spec_helper.rb diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..0871103 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,80 @@ +name: CI + +env: + # Database to connect to that can create other databases with `CREATE DATABASE`. + ADMIN_DATABASE_URL: postgres://postgres:postgres@localhost:5432 + + # Just a common place for steps to put binaries they need and which is added + # to GITHUB_PATH/PATH. + BIN_PATH: /home/runner/bin + + # A suitable URL for a test database. + TEST_DATABASE_URL: postgres://postgres:postgres@127.0.0.1:5432/river_ruby_test?sslmode=disable + +on: + - push + +jobs: + lint: + runs-on: ubuntu-latest + timeout-minutes: 3 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Ruby + `bundle install` + uses: ruby/setup-ruby@v1 + with: + ruby-version: "head" + bundler-cache: true # runs 'bundle install' and caches installed gems automatically + + - name: Standard Ruby + run: bundle exec standardrb + + spec: + runs-on: ubuntu-latest + timeout-minutes: 3 + + services: + postgres: + image: postgres + env: + POSTGRES_PASSWORD: postgres + options: >- + --health-cmd pg_isready + --health-interval 2s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Ruby + `bundle install` + uses: ruby/setup-ruby@v1 + with: + ruby-version: "head" + bundler-cache: true # runs 'bundle install' and caches installed gems automatically + + # There is a version of Go on Actions' base image, but it's old and can't + # read modern `go.mod` annotations correctly. + - name: Install Go + uses: actions/setup-go@v4 + with: + go-version: "stable" + check-latest: true + + - name: Create database + run: psql --echo-errors --quiet -c '\timing off' -c "CREATE DATABASE river_ruby_test;" ${ADMIN_DATABASE_URL} + + - name: Install River CLI + run: go install github.com/riverqueue/river/cmd/river@latest + + - name: river migrate-up + run: ./river migrate-up --database-url $DATABASE_URL + + - name: Rspec + run: bundle exec rspec diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..1cc8109 --- /dev/null +++ b/Gemfile @@ -0,0 +1,12 @@ +source "https://rubygems.org" + +gemspec + +group :development, :test do + gem "standard" +end + +group :test do + gem "rspec-core" + gem "rspec-expectations" +end diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 0000000..3b67d8f --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,70 @@ +PATH + remote: . + specs: + riverqueue (0.0.1) + +GEM + remote: https://rubygems.org/ + specs: + ast (2.4.2) + diff-lcs (1.5.0) + json (2.6.3) + language_server-protocol (3.17.0.3) + lint_roller (1.1.0) + parallel (1.23.0) + parser (3.2.2.4) + ast (~> 2.4.1) + racc + racc (1.7.3) + rainbow (3.1.1) + regexp_parser (2.8.2) + rexml (3.2.6) + rspec-core (3.12.2) + rspec-support (~> 3.12.0) + rspec-expectations (3.12.3) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.12.0) + rspec-support (3.12.1) + rubocop (1.57.2) + json (~> 2.3) + language_server-protocol (>= 3.17.0) + parallel (~> 1.10) + parser (>= 3.2.2.4) + rainbow (>= 2.2.2, < 4.0) + regexp_parser (>= 1.8, < 3.0) + rexml (>= 3.2.5, < 4.0) + rubocop-ast (>= 1.28.1, < 2.0) + ruby-progressbar (~> 1.7) + unicode-display_width (>= 2.4.0, < 3.0) + rubocop-ast (1.30.0) + parser (>= 3.2.1.0) + rubocop-performance (1.19.1) + rubocop (>= 1.7.0, < 2.0) + rubocop-ast (>= 0.4.0) + ruby-progressbar (1.13.0) + standard (1.32.0) + language_server-protocol (~> 3.17.0.2) + lint_roller (~> 1.0) + rubocop (~> 1.57.2) + standard-custom (~> 1.0.0) + standard-performance (~> 1.2) + standard-custom (1.0.2) + lint_roller (~> 1.0) + rubocop (~> 1.50) + standard-performance (1.2.1) + lint_roller (~> 1.1) + rubocop-performance (~> 1.19.1) + unicode-display_width (2.5.0) + +PLATFORMS + arm64-darwin-22 + x86_64-linux + +DEPENDENCIES + riverqueue! + rspec-core + rspec-expectations + standard + +BUNDLED WITH + 2.4.20 diff --git a/README.md b/README.md deleted file mode 100644 index afa2498..0000000 --- a/README.md +++ /dev/null @@ -1,8 +0,0 @@ -# River Ruby bindings - -A future home for River's Ruby bindings. For now, the [Gem is registered](https://rubygems.org/gems/riverqueue), but nothing else is done. - -``` sh -$ gem build riverqueue.gemspec -$ gem push riverqueue-0.0.1.gem -``` diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..af7cffb --- /dev/null +++ b/docs/README.md @@ -0,0 +1,21 @@ +# River client for Ruby [![Build Status](https://github.com/riverqueue/riverqueue-ruby/workflows/CI/badge.svg)](https://github.com/riverqueue/riverqueue-ruby/actions) + +An insert-only Ruby client for [River](https://github.com/riverqueue/river) packaged in the [`riverqueue` gem](https://rubygems.org/gems/riverqueue). + +`Gemfile` should contain the core gem and a driver like [`rubyqueue-sequel`](https://github.com/riverqueue/riverqueue-ruby-sequel): + +``` yaml +gem "riverqueue" +gem "riverqueue-sequel" +``` + +Initialize a client with: + +```ruby +DB = Sequel.connect("postgres://...") +client = River::Client.new(River::Driver::Sequel.new(DB)) +``` + +## Development + +See [development](./development.md). diff --git a/docs/development.md b/docs/development.md new file mode 100644 index 0000000..05caf15 --- /dev/null +++ b/docs/development.md @@ -0,0 +1,39 @@ +# riverqueue-ruby development + +## Install dependencies + +```shell +$ bundle install +``` +## Run tests + +Create a test database and migrate with River's CLI: + +```shell +$ go install github.com/riverqueue/river/cmd/river +$ createdb riverqueue_ruby_test +$ river migrate-up --database-url "postgres://localhost/riverqueue_ruby_test" +``` + +Run all specs: + +```shell +$ bundle exec rspec spec +``` + +## Run lint + +```shell +$ standardrb --fix +``` + +## Publish a new gem + +```shell +git checkout master && git pull --rebase +VERSION=v0.0.x +gem build riverqueue.gemspec +gem push riverqueue-$VERSION.gem +git tag $VERSION +git push --tags +``` \ No newline at end of file diff --git a/lib/client.rb b/lib/client.rb new file mode 100644 index 0000000..be04d41 --- /dev/null +++ b/lib/client.rb @@ -0,0 +1,56 @@ +module River + MAX_ATTEMPTS_DEFAULT = 25 + PRIORITY_DEFAULT = 1 + QUEUE_DEFAULT = "default" + + # Provides a client for River that inserts jobs. Unlike the Go version of the + # River client, this one can insert jobs only. Jobs can only be worked from Go + # code, so job arg kinds and JSON encoding details must be shared between Ruby + # and Go code. + # + # Used in conjunction with a River driver like: + # + # DB = Sequel.connect(...) + # client = River::Client.new(River::Driver::Sequel.new(DB)) + # + # River drivers are found in separate gems like `riverqueue-sequel` to help + # minimize transient dependencies. + class Client + def initialize(driver) + @driver = driver + end + + # Inserts a new job for work given a job args implementation and insertion + # options (which may be omitted). + # + # Job arg implementations are expected to respond to: + # + # * `#kind`: A string that uniquely identifies the job in the database. + # * `#to_json`: Encodes the args to JSON for persistence in the database. + # Must match encoding an args struct on the Go side to be workable. + # + # They may also respond to `#insert_opts` which is expected to return an + # `InsertOpts` that contains options that will apply to all jobs of this + # kind. Insertion options provided as an argument to `#insert` override + # those returned by job args. + def insert(args, insert_opts: InsertOpts.new) + raise "args should respond to `#kind`" if !args.respond_to?(:kind) + raise "args should respond to `#to_json`" if !args.respond_to?(:to_json) + + args_insert_opts = args.respond_to?(:insert_opts) ? args.insert_opts : InsertOpts.new + + scheduled_at = insert_opts.scheduled_at || args_insert_opts.scheduled_at + + @driver.insert(Internal::JobInsertParams.new( + encoded_args: args.to_json, + kind: args.kind, + max_attempts: insert_opts.max_attempts || args_insert_opts.max_attempts || MAX_ATTEMPTS_DEFAULT, + priority: insert_opts.priority || args_insert_opts.priority || PRIORITY_DEFAULT, + queue: insert_opts.queue || args_insert_opts.queue || QUEUE_DEFAULT, + scheduled_at: scheduled_at, # database default to now + state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE, + tags: insert_opts.tags || args_insert_opts.tags + )) + end + end +end diff --git a/lib/insert_opts.rb b/lib/insert_opts.rb new file mode 100644 index 0000000..08fdf16 --- /dev/null +++ b/lib/insert_opts.rb @@ -0,0 +1,62 @@ +module River + class InsertOpts + # MaxAttempts is the maximum number of total attempts (including both the + # original run and all retries) before a job is abandoned and set as + # discarded. + attr_accessor :max_attempts + + # Priority is the priority of the job, with 1 being the highest priority and + # 4 being the lowest. When fetching available jobs to work, the highest + # priority jobs will always be fetched before any lower priority jobs are + # fetched. Note that if your workers are swamped with more high-priority jobs + # then they can handle, lower priority jobs may not be fetched. + # + # Defaults to PRIORITY_DEFAULT. + attr_accessor :priority + + # Queue is the name of the job queue in which to insert the job. + # + # Defaults to QUEUE_DEFAULT. + attr_accessor :queue + + # ScheduledAt is a time in future at which to schedule the job (i.e. in + # cases where it shouldn't be run immediately). The job is guaranteed not + # to run before this time, but may run slightly after depending on the + # number of other scheduled jobs and how busy the queue is. + # + # Use of this option generally only makes sense when passing options into + # Insert rather than when a job args is returning `#insert_opts`, however, + # it will work in both cases. + attr_accessor :scheduled_at + + # Tags are an arbitrary list of keywords to add to the job. They have no + # functional behavior and are meant entirely as a user-specified construct + # to help group and categorize jobs. + # + # If tags are specified from both a job args override and from options on + # Insert, the latter takes precedence. Tags are not merged. + attr_accessor :tags + + # UniqueOpts returns options relating to job uniqueness. An empty struct + # avoids setting any worker-level unique options. + # + # TODO: Implement. + attr_accessor :unique_opts + + def initialize( + max_attempts: nil, + priority: nil, + queue: nil, + scheduled_at: nil, + tags: nil, + unique_opts: nil + ) + self.max_attempts = max_attempts + self.priority = priority + self.queue = queue + self.scheduled_at = scheduled_at + self.tags = tags + self.unique_opts = unique_opts + end + end +end diff --git a/lib/internal.rb b/lib/internal.rb new file mode 100644 index 0000000..0975f59 --- /dev/null +++ b/lib/internal.rb @@ -0,0 +1,44 @@ +module River + module Internal + # Insert parameters for a job. This is sent to underlying drivers and is meant + # for internal use only. Its interface is subject to change. + class JobInsertParams + attr_accessor :encoded_args + attr_accessor :kind + attr_accessor :max_attempts + attr_accessor :priority + attr_accessor :queue + attr_accessor :scheduled_at + attr_accessor :state + attr_accessor :tags + + # TODO(brandur): Get these supported. + # attr_accessor :unique + # attr_accessor :unique_by_args + # attr_accessor :unique_by_period + # attr_accessor :unique_by_queue + # attr_accessor :unique_by_state + + def initialize( + encoded_args: nil, + kind: nil, + max_attempts: nil, + priority: nil, + queue: nil, + scheduled_at: nil, + state: nil, + tags: nil + ) + self.encoded_args = encoded_args + self.kind = kind + self.max_attempts = max_attempts + self.priority = priority + self.queue = queue + self.scheduled_at = scheduled_at + self.state = state + self.tags = tags + end + end + end + private_constant :Internal +end diff --git a/lib/job.rb b/lib/job.rb new file mode 100644 index 0000000..3ae5152 --- /dev/null +++ b/lib/job.rb @@ -0,0 +1,155 @@ +module River + JOB_STATE_AVAILABLE = "available" + JOB_STATE_CANCELLED = "cancelled" + JOB_STATE_COMPLETED = "completed" + JOB_STATE_DISCARDED = "discarded" + JOB_STATE_RETRYABLE = "retryable" + JOB_STATE_RUNNING = "running" + JOB_STATE_SCHEDULED = "scheduled" + + # Provides a way of creating a job args from a simple Ruby hash for a quick + # way to insert a job without having to define a class. The first argument is + # a "kind" string for identifying the job in the database and the second is a + # hash that will be encoded to JSON. + class JobArgsHash + def initialize(kind, hash) + raise "kind should be non-nil" if !kind + raise "hash should be non-nil" if !hash + + @kind = kind + @hash = hash + end + + attr_reader :kind + + def to_json + JSON.dump(@hash) + end + end + + # JobRow contains the properties of a job that are persisted to the database. + class JobRow + # ID of the job. Generated as part of a Postgres sequence and generally + # ascending in nature, but there may be gaps in it as transactions roll + # back. + attr_accessor :id + + # The attempt number of the job. Jobs are inserted at 0, the number is + # incremented to 1 the first time work its worked, and may increment further + # if it's either snoozed or errors. + attr_accessor :attempt + + # The time that the job was last worked. Starts out as `nil` on a new + # insert. + attr_accessor :attempted_at + + # The set of worker IDs that have worked this job. A worker ID differs + # between different programs, but is shared by all executors within any + # given one. (i.e. Different Go processes have different IDs, but IDs are + # shared within any given process.) A process generates a new ULID (an + # ordered UUID) worker ID when it starts up. + attr_accessor :attempted_by + + # When the job record was created. + attr_accessor :created_at + + # The job's args encoded as JSON. + attr_accessor :encoded_args + + # A set of errors that occurred when the job was worked, one for each + # attempt. Ordered from earliest error to the latest error. + attr_accessor :errors + + # The time at which the job was "finalized", meaning it was either completed + # successfully or errored for the last time such that it'll no longer be + # retried. + attr_accessor :finalized_at + + # Kind uniquely identifies the type of job and instructs which worker + # should work it. It is set at insertion time via `#kind` on job args. + attr_accessor :kind + + # The maximum number of attempts that the job will be tried before it errors + # for the last time and will no longer be worked. + attr_accessor :max_attempts + + # The priority of the job, with 1 being the highest priority and 4 being the + # lowest. When fetching available jobs to work, the highest priority jobs + # will always be fetched before any lower priority jobs are fetched. Note + # that if your workers are swamped with more high-priority jobs then they + # can handle, lower priority jobs may not be fetched. + attr_accessor :priority + + # The name of the queue where the job will be worked. Queues can be + # configured independently and be used to isolate jobs. + attr_accessor :queue + + # When the job is scheduled to become available to be worked. Jobs default + # to running immediately, but may be scheduled for the future when they're + # inserted. They may also be scheduled for later because they were snoozed + # or because they errored and have additional retry attempts remaining. + attr_accessor :scheduled_at + + # The state of job like `available` or `completed`. Jobs are `available` + # when they're first inserted. + attr_accessor :state + + # Tags are an arbitrary list of keywords to add to the job. They have no + # functional behavior and are meant entirely as a user-specified construct + # to help group and categorize jobs. + attr_accessor :tags + + def initialize( + id: nil, + attempt: nil, + attempted_by: nil, + created_at: nil, + encoded_args: nil, + errors: nil, + finalized_at: nil, + kind: nil, + max_attempts: nil, + priority: nil, + queue: nil, + scheduled_at: nil, + state: nil, + tags: nil + ) + self.id = id + self.attempt = attempt + self.attempted_by = attempted_by + self.created_at = created_at + self.encoded_args = encoded_args + self.errors = errors + self.finalized_at = finalized_at + self.kind = kind + self.max_attempts = max_attempts + self.priority = priority + self.queue = queue + self.scheduled_at = scheduled_at + self.state = state + self.tags = tags + end + end + + # A failed job work attempt containing information about the error or panic + # that occurred. + class AttemptError + attr_accessor :at + attr_accessor :error + attr_accessor :num + attr_accessor :trace + + def initialize( + at: nil, + error: nil, + num: nil, + trace: nil + ) + self.at = id + self.error = error + self.num = num + self.trace = trace + end + end +end diff --git a/lib/riverqueue.rb b/lib/riverqueue.rb index e69de29..70095cc 100644 --- a/lib/riverqueue.rb +++ b/lib/riverqueue.rb @@ -0,0 +1,9 @@ +require "json" + +require_relative "client" +require_relative "insert_opts" +require_relative "internal" +require_relative "job" + +module River +end diff --git a/riverqueue.gemspec b/riverqueue.gemspec index 1da7766..c7e2ba3 100644 --- a/riverqueue.gemspec +++ b/riverqueue.gemspec @@ -1,11 +1,11 @@ Gem::Specification.new do |s| - s.name = "riverqueue" - s.version = "0.0.1" - s.summary = "A fast job queue for Go" + s.name = "riverqueue" + s.version = "0.0.1" + s.summary = "A fast job queue for Go." s.description = "A fast job queue for Go." - s.authors = ["Blake Gentry", "Brandur Leach"] - s.email = "brandur@brandur.org" - s.files = ["lib/riverqueue.rb"] - s.homepage = "https://riverqueue.com" - s.license = "LGPL-3.0-or-later" + s.authors = ["Blake Gentry", "Brandur Leach"] + s.email = "brandur@brandur.org" + s.files = ["lib/riverqueue.rb"] + s.homepage = "https://riverqueue.com" + s.license = "LGPL-3.0-or-later" end diff --git a/spec/client_spec.rb b/spec/client_spec.rb new file mode 100644 index 0000000..d59bfb9 --- /dev/null +++ b/spec/client_spec.rb @@ -0,0 +1,6 @@ +require "spec_helper" + +RSpec.describe River::Client do + describe "#insert" do + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..3c855b2 --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1 @@ +require "riverqueue"