From c7cb3ff6160c5d8db57e5c4a9ebb68f1b6629c4a Mon Sep 17 00:00:00 2001 From: "James Armes (they/them)" Date: Wed, 28 Aug 2024 11:12:53 -0400 Subject: [PATCH] CCAP-304: Add support for background jobs (#9) * Added background jobs with job to deactivate expired auth keys. * Added bootstrap system * Added linting for code documentation. * Added markdown linting. * Replace unmaintained GitHub action for rubocop --- .github/config/rubocop_linter_action.yml | 45 ---- .github/workflows/branch.yaml | 8 +- .github/workflows/main.yaml | 8 +- .gitignore | 2 + .pryrc | 22 +- .rubocop.yml | 8 + Dockerfile | 2 +- Gemfile | 2 + Gemfile.lock | 25 +++ README.md | 13 +- Rakefile | 14 +- config.ru | 34 +-- db/1_schema.rb | 27 ++- db/migrations/202408091839_create_jobs.rb | 26 +++ doc/api.md | 17 ++ doc/api/authentication.md | 2 +- doc/runbooks/create_auth_key.md | 10 +- doc/worker.md | 80 +++++++ docker-compose.yaml | 46 ++-- document-transfer-service.gemspec | 8 + lib/api/middleware/correlation_id.rb | 6 +- lib/api/middleware/instrument.rb | 20 +- lib/api/middleware/request_id.rb | 6 +- lib/api/middleware/request_logging.rb | 30 +-- lib/bootstrap.rb | 20 ++ lib/bootstrap/api.rb | 26 +++ lib/bootstrap/console.rb | 28 +++ lib/bootstrap/rake.rb | 34 +++ lib/bootstrap/stage/base.rb | 33 +++ lib/bootstrap/stage/database.rb | 72 ++++++ lib/bootstrap/stage/jobs.rb | 30 +++ lib/bootstrap/stage/logger.rb | 22 ++ lib/bootstrap/stage/models.rb | 18 ++ lib/bootstrap/stage/prompt.rb | 52 +++++ lib/bootstrap/stage/rake_tasks.rb | 20 ++ lib/bootstrap/stage/telemetry.rb | 29 +++ lib/bootstrap/stage/worker.rb | 57 +++++ lib/bootstrap/worker.rb | 26 +++ lib/config/application.rb | 12 +- lib/delayed/backend/sequel.rb | 211 ++++++++++++++++++ lib/document_transfer.rb | 2 + lib/job.rb | 41 ++++ lib/job/base.rb | 107 +++++++++ lib/job/cron/base.rb | 60 +++++ lib/job/cron/expire_key.rb | 24 ++ lib/job/queue.rb | 63 ++++++ lib/rake/database/create.rb | 3 + lib/rake/database/database.rb | 2 +- lib/rake/database/setup.rb | 2 +- lib/rake/jobs/jobs.rb | 29 +++ lib/rake/jobs/queue.rb | 30 +++ lib/rake/jobs/schedule.rb | 29 +++ lib/{api => }/util/measure.rb | 0 sample.env | 1 + script/api | 14 ++ script/worker | 14 ++ .../document_transfer/rake/jobs/queue_spec.rb | 30 +++ .../rake/jobs/schedule_spec.rb | 14 ++ spec/spec_helper.rb | 13 +- spec/support/contexts/rake.rb | 11 + spec/support/examples.rb | 1 + spec/support/examples/bootstrap_stages.rb | 31 +++ spec/support/factories.rb | 2 + .../factories/config/application_factory.rb | 13 ++ spec/support/factories/model/job_factory.rb | 11 + .../document_transfer/bootstrap/api_spec.rb | 7 + .../bootstrap/console_spec.rb | 7 + .../document_transfer/bootstrap/rake_spec.rb | 7 + .../bootstrap/stage/database_spec.rb | 51 +++++ .../bootstrap/stage/prompt_spec.rb | 51 +++++ .../bootstrap/stage/worker_spec.rb | 52 +++++ .../bootstrap/worker_spec.rb | 7 + .../delayed/backend/sequel/job_spec.rb | 105 +++++++++ .../document_transfer/job/cron/base_spec.rb | 96 ++++++++ .../job/cron/expire_key_spec.rb | 45 ++++ spec/unit/document_transfer/job/queue_spec.rb | 63 ++++++ spec/unit/document_transfer/job_spec.rb | 29 +++ 77 files changed, 2045 insertions(+), 173 deletions(-) delete mode 100644 .github/config/rubocop_linter_action.yml create mode 100644 db/migrations/202408091839_create_jobs.rb create mode 100644 doc/worker.md create mode 100644 lib/bootstrap.rb create mode 100644 lib/bootstrap/api.rb create mode 100644 lib/bootstrap/console.rb create mode 100644 lib/bootstrap/rake.rb create mode 100644 lib/bootstrap/stage/base.rb create mode 100644 lib/bootstrap/stage/database.rb create mode 100644 lib/bootstrap/stage/jobs.rb create mode 100644 lib/bootstrap/stage/logger.rb create mode 100644 lib/bootstrap/stage/models.rb create mode 100644 lib/bootstrap/stage/prompt.rb create mode 100644 lib/bootstrap/stage/rake_tasks.rb create mode 100644 lib/bootstrap/stage/telemetry.rb create mode 100644 lib/bootstrap/stage/worker.rb create mode 100644 lib/bootstrap/worker.rb create mode 100644 lib/delayed/backend/sequel.rb create mode 100644 lib/job.rb create mode 100644 lib/job/base.rb create mode 100644 lib/job/cron/base.rb create mode 100644 lib/job/cron/expire_key.rb create mode 100644 lib/job/queue.rb create mode 100644 lib/rake/jobs/jobs.rb create mode 100644 lib/rake/jobs/queue.rb create mode 100644 lib/rake/jobs/schedule.rb rename lib/{api => }/util/measure.rb (100%) create mode 100755 script/api create mode 100755 script/worker create mode 100644 spec/functional/document_transfer/rake/jobs/queue_spec.rb create mode 100644 spec/functional/document_transfer/rake/jobs/schedule_spec.rb create mode 100644 spec/support/examples/bootstrap_stages.rb create mode 100644 spec/support/factories/config/application_factory.rb create mode 100644 spec/support/factories/model/job_factory.rb create mode 100644 spec/unit/document_transfer/bootstrap/api_spec.rb create mode 100644 spec/unit/document_transfer/bootstrap/console_spec.rb create mode 100644 spec/unit/document_transfer/bootstrap/rake_spec.rb create mode 100644 spec/unit/document_transfer/bootstrap/stage/database_spec.rb create mode 100644 spec/unit/document_transfer/bootstrap/stage/prompt_spec.rb create mode 100644 spec/unit/document_transfer/bootstrap/stage/worker_spec.rb create mode 100644 spec/unit/document_transfer/bootstrap/worker_spec.rb create mode 100644 spec/unit/document_transfer/delayed/backend/sequel/job_spec.rb create mode 100644 spec/unit/document_transfer/job/cron/base_spec.rb create mode 100644 spec/unit/document_transfer/job/cron/expire_key_spec.rb create mode 100644 spec/unit/document_transfer/job/queue_spec.rb create mode 100644 spec/unit/document_transfer/job_spec.rb diff --git a/.github/config/rubocop_linter_action.yml b/.github/config/rubocop_linter_action.yml deleted file mode 100644 index 91819cb..0000000 --- a/.github/config/rubocop_linter_action.yml +++ /dev/null @@ -1,45 +0,0 @@ -# .github/config/rubocop_linter_action.yml - -# Description: The name of the check that will be created. -# Valid Options: A reasonably sized string. -# Default: 'RuboCop Action' -check_name: 'RuboCop Results' - -# Description: Versions required to run your RuboCop checks. -# Valid options: RuboCop and any RuboCop extension, by default the latest gem version will be used. You can explicitly state that -# (not required) or use a version number, like '1.5.1'. -# Default: -# versions: -# - rubocop: 'latest' -versions: - - rubocop: 'latest' - - rubocop-factory_bot: 'latest' - - rubocop-rake: 'latest' - - rubocop-rspec: 'latest' - - rubocop-sequel: 'latest' - -# Description: RuboCop configuration file path relative to the workspace. -# Valid options: A valid file path inside of the workspace. -# Default: nil -# Note: This does not need to be filled out for RuboCop to still find your config. -# Resource: https://rubocop.readthedocs.io/en/stable/configuration/ -rubocop_config_path: '.rubocop.yml' - -# Whether or not to use --force-exclusion when building the rubocop command. Use this if you are only linting modified -# files and typically excluded files have been changed. For example, if you exclude db/schema.rb in your rubocop.yml -# but a change gets made, then with the check_scope config set to 'modified' rubocop will lint db/schema.rb. If you set -# this to true, rubocop will ignore it. -# Valid options: true || false -# Default: false -rubocop_force_exclusion: true - -# The scope of code that RuboCop should lint. Use this if you only want to lint changed files. If this is not set -# or not equal to 'modified', RuboCop is run against the entire codebase. Note that this will not work on the master branch. -# Valid options: 'modified' -# Default: nil -check_scope: 'modified' - -# The base branch against which changes will be compared, if check_scope config is set to 'modified'. -# This setting is not used if check_scope != 'modified'. -# Valid options: 'origin/another_branch' -base_branch: 'origin/main' diff --git a/.github/workflows/branch.yaml b/.github/workflows/branch.yaml index 7cebeec..d326e15 100644 --- a/.github/workflows/branch.yaml +++ b/.github/workflows/branch.yaml @@ -12,10 +12,12 @@ jobs: steps: - uses: actions/checkout@v4 - run: git fetch origin main --depth=1 + - name: Set up Ruby + uses: ruby/setup-ruby@v1 + with: + bundler-cache: true - name: RuboCop Linter - uses: andrewmcodes/rubocop-linter-action@v3.3.0 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: bundle exec rubocop spec: runs-on: ubuntu-latest diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 5db4807..b062c6f 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -14,10 +14,12 @@ jobs: with: fetch-depth: 1 - run: git fetch origin main --depth=1 + - name: Set up Ruby + uses: ruby/setup-ruby@v1 + with: + bundler-cache: true - name: RuboCop Linter - uses: andrewmcodes/rubocop-linter-action@v3.3.0 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: bundle exec rubocop spec: runs-on: ubuntu-latest diff --git a/.gitignore b/.gitignore index 7ae3bf7..fe48e4f 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ *.env !sample.env coverage/ +log/ # Ignore Byebug command history file. .byebug_history @@ -17,6 +18,7 @@ coverage/ # Include project IDE settings, but ignore user-specific settings. !.idea/ +.idea/dataSources.xml *.iml *.iws out/ diff --git a/.pryrc b/.pryrc index 161bfd5..b09e9d3 100644 --- a/.pryrc +++ b/.pryrc @@ -1,25 +1,7 @@ # frozen_string_literal: true -require 'sequel' - require_relative 'lib/config/application' -require_relative 'lib/model' +require_relative 'lib/bootstrap/console' config = DocumentTransfer::Config::Application.from_environment -Sequel.connect(config.database_credentials) - -DocumentTransfer::Model.load - -color = "\e[1;32m" -color = "\e[1;31m" if config.prod? -color = "\e[1;33m" if config.prod_like? - -Pry.config.prompt_name = "document-transfer(#{config.environment})" -Pry.config.prompt = Pry::Prompt.new( - :document_transfer, - 'Document transfer console prompt', - [ - proc { |_, _, p| "#{color}[#{p.input_ring.count}] #{p.config.prompt_name} > \e[0m" }, - proc { |_, _, p| "#{color}[#{p.input_ring.count}] #{p.config.prompt_name} * \e[0m" } - ] -) +DocumentTransfer::Bootstrap::Console.new(config).bootstrap diff --git a/.rubocop.yml b/.rubocop.yml index b99ec94..5dec583 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -1,8 +1,10 @@ require: - rubocop-factory_bot + - rubocop-md - rubocop-rake - rubocop-rspec - rubocop-sequel + - rubocop-yard AllCops: NewCops: enable @@ -18,6 +20,12 @@ AllCops: FactoryBot/FactoryAssociationWithStrategy: Enabled: false +Layout/InitialIndentation: + Exclude: + # Runbooks often have indented code blocks and rubocop doesn't like that. + # https://github.com/rubocop/rubocop-md/issues/28 + - doc/runbooks/**/*.md + # Don't let long declarations and invocations to contribute to method length. # This is preferable over squeezing them onto a single line. Metrics/MethodLength: diff --git a/Dockerfile b/Dockerfile index bb5db9b..aeee8fc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,4 +16,4 @@ RUN bundle binstubs --all # Copy the application code. COPY . . -CMD ["bundle", "exec", "rackup", "--port", "3000", "--host", "0.0.0.0"] +CMD ["./script/api"] diff --git a/Gemfile b/Gemfile index ddf7afa..37bf712 100644 --- a/Gemfile +++ b/Gemfile @@ -13,9 +13,11 @@ group :development do gem 'rake', '~> 13.0' gem 'rubocop', '~> 1.48' gem 'rubocop-factory_bot', '~> 2.23' + gem 'rubocop-md', '~> 1.2' gem 'rubocop-rake', '~> 0.6' gem 'rubocop-rspec', '~> 2.22' gem 'rubocop-sequel', '~> 0.3' + gem 'rubocop-yard', '~> 0.9.3' end group :test do diff --git a/Gemfile.lock b/Gemfile.lock index 70180ba..fef1bad 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -10,15 +10,20 @@ PATH remote: . specs: document-transfer-service (0.1.0) + activesupport (~> 7.1.0) adal (~> 1.0) bcrypt (~> 3.1) + daemons (~> 1.4) + delayed_job (~> 4.1) faraday (~> 2.9) + fugit (~> 1.11) grape (~> 2.0) grape-entity (~> 1.0) grape-swagger (~> 2.1) grape-swagger-entity (~> 0.5) httparty (~> 0.22) opentelemetry-exporter-otlp (~> 0.27) + opentelemetry-instrumentation-delayed_job (~> 0.22) opentelemetry-instrumentation-faraday (~> 0.24) opentelemetry-instrumentation-grape (~> 0.1) opentelemetry-instrumentation-rack (~> 0.24) @@ -55,6 +60,9 @@ GEM concurrent-ruby (1.3.3) connection_pool (2.4.1) csv (3.3.0) + daemons (1.4.1) + delayed_job (4.1.11) + activesupport (>= 3.0, < 8.0) diff-lcs (1.5.1) docile (1.4.0) drb (2.2.1) @@ -73,6 +81,8 @@ GEM dry-inflector (~> 1.0) dry-logic (~> 1.4) zeitwerk (~> 2.6) + et-orbi (1.2.11) + tzinfo factory_bot (6.4.6) activesupport (>= 5.0.0) faker (3.4.1) @@ -82,6 +92,9 @@ GEM logger faraday-net_http (3.1.0) net-http + fugit (1.11.0) + et-orbi (~> 1, >= 1.2.11) + raabro (~> 1.4) google-protobuf (4.27.2) bigdecimal rake (>= 13) @@ -166,6 +179,9 @@ GEM opentelemetry-api (~> 1.0) opentelemetry-common (~> 0.21) opentelemetry-registry (~> 0.1) + opentelemetry-instrumentation-delayed_job (0.22.4) + opentelemetry-api (~> 1.0) + opentelemetry-instrumentation-base (~> 0.22.1) opentelemetry-instrumentation-faraday (0.24.5) opentelemetry-api (~> 1.0) opentelemetry-instrumentation-base (~> 0.22.1) @@ -193,6 +209,7 @@ GEM pry (0.14.2) coderay (~> 1.1) method_source (~> 1.0) + raabro (1.4.0) racc (1.8.0) rack (3.1.7) rack-test (2.1.0) @@ -239,6 +256,8 @@ GEM rubocop (~> 1.41) rubocop-factory_bot (2.26.1) rubocop (~> 1.61) + rubocop-md (1.2.2) + rubocop (>= 1.0) rubocop-rake (0.6.0) rubocop (~> 1.0) rubocop-rspec (2.31.0) @@ -250,6 +269,9 @@ GEM rubocop (~> 1.61) rubocop-sequel (0.3.4) rubocop (~> 1.0) + rubocop-yard (0.9.3) + rubocop (~> 1.21) + yard ruby-progressbar (1.13.0) ruby2_keywords (0.0.5) semantic_logger (4.16.0) @@ -280,6 +302,7 @@ GEM uri (0.13.0) uri_template (0.7.0) webrick (1.8.1) + yard (0.9.36) zeitwerk (2.6.16) PLATFORMS @@ -311,9 +334,11 @@ DEPENDENCIES rspec-uuid (~> 0.6) rubocop (~> 1.48) rubocop-factory_bot (~> 2.23) + rubocop-md (~> 1.2) rubocop-rake (~> 0.6) rubocop-rspec (~> 2.22) rubocop-sequel (~> 0.3) + rubocop-yard (~> 0.9.3) simplecov (~> 0.22) sqlite3 (~> 2.0) diff --git a/README.md b/README.md index c2b3f28..05456b6 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ service. Using docker compose, the application code will be mounted from your local system to the running container. This will allow you to make changes to the code and see them reflected in the running service without having to rebuild the -image. +image. Using docker compose will launch the api, the worker, and a database. To run the service with docker compose, make sure you have [Docker Desktop] installed and run the following: @@ -88,11 +88,19 @@ bundle exec rake db:setup You should now be able to start the service with: ```sh -bundle exec rackup +./script/api ``` The service should now be available at `http://localhost:9292`. +#### Running the worker + +The worker is a separate process that is responsible for processing background +jobs. Depending on the parts of the service you're using, you may need to run +the worker alongside the API. + +To run the worker, see the [worker documentation][worker] for more information. + #### Updating the service When you update the service locally, you will need to install any new @@ -136,3 +144,4 @@ service. [ruby-version]: ./.ruby-version [rvm]: https://rvm.io/ [source]: ./doc/sources.md +[worker]: ./doc/worker.md diff --git a/Rakefile b/Rakefile index 547eae7..0671635 100644 --- a/Rakefile +++ b/Rakefile @@ -6,6 +6,15 @@ require 'rubocop/rake_task' require_relative 'lib/document_transfer' +require_relative 'lib/bootstrap/rake' +require_relative 'lib/config/application' + +require_relative 'lib/api/api' + +# Bootstrap the application for rake. +config = DocumentTransfer::Config::Application.from_environment +DocumentTransfer::Bootstrap::Rake.new(config).bootstrap + task default: %i[spec rubocop] task :environment do # rubocop:disable Rake/Desc @@ -16,9 +25,8 @@ GrapeSwagger::Rake::OapiTasks.new('::DocumentTransfer::API::API') RuboCop::RakeTask.new(:rubocop) do |task| task.requires << 'rubocop' + task.formatters = %w[pacman] + task.formatters << 'github' if ENV.fetch('GITHUB_ACTIONS', false) end RSpec::Core::RakeTask.new(:spec) - -# Load our custom tasks. -DocumentTransfer.load_rake_tasks diff --git a/config.ru b/config.ru index 213c3d5..294ab50 100644 --- a/config.ru +++ b/config.ru @@ -1,40 +1,18 @@ # frozen_string_literal: true -require 'opentelemetry/sdk' -require 'opentelemetry-exporter-otlp' -require 'opentelemetry/instrumentation/faraday' -require 'opentelemetry/instrumentation/grape' +require 'rackup' require 'opentelemetry/instrumentation/rack' -require 'semantic_logger' -require 'sequel' -require_relative 'lib/document_transfer' +require_relative 'lib/bootstrap/api' +require_relative 'lib/config/application' require_relative 'lib/api/api' require_relative 'lib/api/middleware' -require_relative 'lib/config/application' -require_relative 'lib/model' -# Connect to the database. +# Bootstrap the application. config = DocumentTransfer::Config::Application.from_environment -Sequel.connect(config.database_credentials) - -# Load all models. -DocumentTransfer::Model.load - -# Configure the logger. -SemanticLogger.default_level = ENV.fetch('LOG_LEVEL', - DocumentTransfer::DEFAULT_LOG_LEVEL) -SemanticLogger.application = DocumentTransfer::NAME -SemanticLogger.add_appender(io: $stdout, formatter: :json) - -# Configure telemetry reporting. -OpenTelemetry::SDK.configure do |c| - c.service_name = DocumentTransfer::NAME - c.service_version = DocumentTransfer::VERSION - c.use_all -end +DocumentTransfer::Bootstrap::API.new(config).bootstrap -# Include Rack middleware. +# Load Rack middleware. use Rack::RewindableInput::Middleware use(*OpenTelemetry::Instrumentation::Rack::Instrumentation.instance.middleware_args) DocumentTransfer::API::Middleware.load.each { |m| use m } diff --git a/db/1_schema.rb b/db/1_schema.rb index ee77084..720d479 100644 --- a/db/1_schema.rb +++ b/db/1_schema.rb @@ -6,16 +6,33 @@ TrueClass :active, :default=>true DateTime :created, :null=>false DateTime :updated, :null=>false - + primary_key [:id] end - + + create_table(:jobs, :ignore_index_errors=>true) do + primary_key :id + Integer :priority, :default=>0 + Integer :attempts, :default=>0 + String :handler, :text=>true + DateTime :run_at + DateTime :locked_at + String :locked_by, :text=>true + DateTime :failed_at + String :last_error, :text=>true + String :queue, :size=>128 + String :cron, :text=>true + + index [:locked_at], :name=>:index_delayed_jobs_locked_at + index [:priority, :run_at], :name=>:index_delayed_jobs_run_at_priority + end + create_table(:schema_migrations) do String :filename, :text=>true, :null=>false - + primary_key [:filename] end - + create_table(:auth_keys) do String :id, :null=>false foreign_key :consumer_id, :consumers, :type=>String, :null=>false, :key=>[:id] @@ -25,7 +42,7 @@ DateTime :updated, :null=>false DateTime :expires, :null=>false DateTime :last_access - + primary_key [:id] end end diff --git a/db/migrations/202408091839_create_jobs.rb b/db/migrations/202408091839_create_jobs.rb new file mode 100644 index 0000000..079642f --- /dev/null +++ b/db/migrations/202408091839_create_jobs.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +Sequel.migration do + up do + create_table(:jobs) do + primary_key :id + Integer :priority, default: 0 + Integer :attempts, default: 0 + String :handler, text: true + DateTime :run_at + DateTime :locked_at + String :locked_by, text: true + DateTime :failed_at + String :last_error, text: true + String :queue, size: 128 + String :cron + + index [:locked_at], name: :index_delayed_jobs_locked_at + index %i[priority run_at], name: :index_delayed_jobs_run_at_priority + end + end + + down do + drop_table(:jobs) + end +end diff --git a/doc/api.md b/doc/api.md index 90d804d..63d6739 100644 --- a/doc/api.md +++ b/doc/api.md @@ -56,7 +56,24 @@ Additional fields may be included based on the destination type. } ``` +## Instrumentation + +The following metrics are reported for each API call using the +`DocumentTransfer::API::Middleware::Instrument` middleware for rack. + +| Metric Name | Description | +|------------------------------|------------------------------------------------------------| +| `endpoint.requests.count` | Counter incremented with each request to a valid endpoint. | +| `endpoint.requests.duration` | Request duration, in milliseconds, for valid endpoints. | + +## Logging + +All requests are logged using the `DocumentTransfer::API::Middleware::Logger` +middleware for rack. The logger utilizes [semantic logging][semantic_logger] in +JSON format to provide easily parsable log entries. + [authentication]: ./api/authentication.md [destination]: ./destinations.md +[semantic_logger]: https://logger.rocketjob.io/ [source]: ./sources.md [spec]: ../openapi.yaml diff --git a/doc/api/authentication.md b/doc/api/authentication.md index a896750..afc9c83 100644 --- a/doc/api/authentication.md +++ b/doc/api/authentication.md @@ -10,7 +10,7 @@ header with your request in order to properly authenticate: A properly formatted header will similar to the following: ```http request -authorization: Bearer realm="c57a46d7-d053-4eea-a6c5-1ec4e3d2f267" MYVJvZ7ay1ecAPU7aXMb26E6bpL5qX57BYY8q7KSyYU= +authorization: Bearer realm="c57a46d7-d053-4eea-a6c5-1ec4e3d2f267" MYVJvZ7ay1ecAPU7aXMb26E6bpL5qX57BYY8q7KSyYU= ``` ## Creating diff --git a/doc/runbooks/create_auth_key.md b/doc/runbooks/create_auth_key.md index a4e6085..2299927 100644 --- a/doc/runbooks/create_auth_key.md +++ b/doc/runbooks/create_auth_key.md @@ -4,13 +4,13 @@ In order to access the API, a client will need a consumer with a valid authentication key. These can be created by using the [console]. 1. Open the [console]. -1. If the key is for a new client, create the consumer and note the id: - +1. If the key is for a new client, create the consumer and note the id: + ```ruby consumer = DocumentTransfer::Model::Consumer.create(name: '') consumer.id ``` - + 1. If the key is for an existing client, retrieve it using the id or name: ```ruby @@ -25,8 +25,8 @@ authentication key. These can be created by using the [console]. ```ruby key = DocumentTransfer::Model::AuthKey.create(consumer:) key.plain_key - ``` - + ``` + 1. Save the consumer id and key in a secure location accessible by the client. 2. For information on how to use these values, see the [authentication documentation][auth]. diff --git a/doc/worker.md b/doc/worker.md new file mode 100644 index 0000000..7c6747f --- /dev/null +++ b/doc/worker.md @@ -0,0 +1,80 @@ +# Document Transfer Service Worker + +The Document Transfer Service utilizes [delayed_job] to schedule and execute +background tasks. The worker is responsible for processing those tasks. + +## Starting + +The worker can be started by running the following from the root of the +repository: + +```bash +./script/worker run +``` + +This will run the worker in the foreground, allowing you to see the output +in your console. To run the worker in the background, you can use the `start` +and `stop` commands. + +```bash +./script/worker start +./script/worker stop +``` + +## Configuration + +The worker requires little configuration, and the defaults will be appropriate +for most deployments. If you'd like to customize the configuration, you can set +the following environment variables: + +| Name | Description | Default | +|------------------------|---------------------------------------------------|---------| +| `QUEUE_STATS_INTERVAL` | Interval, in seconds, to report queue statistics. | `30` | + +## Implementation + +Individual jobs are defined in `lib/job`. To implement a new, non-recurring job, +create a new class that extends `DocumentTransfer::Job::Base` and implements the +`#perform` method. The `#perform` method should contain the logic to be executed +when the job is processed. + +To create a new recurring job, create a new class under `lib/job/cron` that +extends `DocumentTransfer::Job::Cron::Base` and implements the `#perform` as +described above. Additionally, set `self.cron_expression` at the top of your +class to a valid [cron expression][cron]. + +The base classes will handle queuing, initialize a `logger`, and record metrics. +To add your new job to the system, require the class in +`DocumentTransfer::Job.load`. + +## Instrumentation + +The following metrics are collected and reported by StatsD regularly (default is +every 30 seconds). They are gathered by a separate thread that is created when +bootstrapping the worker in `DocumentTransfer::Bootstrap::Stage::Worker`. + +| Metric Name | Description | +|-----------------------------|------------------------------------------------------------------------------------------------------------------| +| `jobs.queue.oldest.age` | Age, in seconds, of the oldest job in the queue. Does not include recurring jobs scheduled to run in the future. | +| `jobs.queue.size.recurring` | Total number of recurring jobs in the queue. | +| `jobs.queue.size.running` | Number of jobs currently being processed by a worker. | +| `jobs.queue.size.total` | Total number of jobs in the queue, including recurring jobs. | +| `jobs.queue.size.waiting` | Number of jobs waiting to be processed, excluding recurring jobs scheduled to run in the future. | + +Additionally, the following metrics are reported for each job: + +| Metric Name | Description | +|---------------------------|--------------------------------------------------------| +| `jobs.queued.count` | This counter is incremented each time a job is queued. | +| `jobs.completed.duration` | Execution time for individual jobs. | + +## Tooling + +The following [rake] tasks are available to help with managing the queue: + +- `rake jobs:queue` - Print information about the queue in JSON format. +- `rake jobs:schedule` - Schedule all recurring jobs. + +[cron]: https://crontab.guru/ +[delayed_job]: https://github.com/collectiveidea/delayed_job +[rake]: https://ruby.github.io/rake/ diff --git a/docker-compose.yaml b/docker-compose.yaml index 4114fce..f97df50 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,27 +1,27 @@ + +x-service-defaults: &service-defaults + build: . + depends_on: + db: + condition: service_healthy + restart: always + environment: + ONEDRIVE_CLIENT_ID: ${ONEDRIVE_CLIENT_ID} + ONEDRIVE_CLIENT_SECRET: ${ONEDRIVE_CLIENT_SECRET} + ONEDRIVE_DRIVE_ID: ${ONEDRIVE_DRIVE_ID} + ONEDRIVE_TENANT_ID: ${ONEDRIVE_TENANT_ID} + DATABASE_USER: ${DATABASE_USER:-postgres} + DATABASE_PASSWORD: ${DATABASE_PASSWORD:-postgres} + DATABASE_HOST: db + DATABASE_NAME: ${POSTGRES_DB:-document_transfer} + volumes: + - .:/opt/app + services: api: - build: . - depends_on: - db: - condition: service_healthy - restart: always - environment: - ONEDRIVE_CLIENT_ID: ${ONEDRIVE_CLIENT_ID} - ONEDRIVE_CLIENT_SECRET: ${ONEDRIVE_CLIENT_SECRET} - ONEDRIVE_DRIVE_ID: ${ONEDRIVE_DRIVE_ID} - ONEDRIVE_TENANT_ID: ${ONEDRIVE_TENANT_ID} - DATABASE_USER: ${DATABASE_USER:-postgres} - DATABASE_PASSWORD: ${DATABASE_PASSWORD:-postgres} - DATABASE_HOST: db - DATABASE_NAME: ${POSTGRES_DB:-document_transfer} - volumes: - - .:/opt/app + <<: *service-defaults ports: - - "3000:${PORT:-3000}" - command: > - bash -c "bundle install - && bundle exec rake db:setup - && bundle exec rackup --port 3000 --host 0.0.0.0" + - "3000:3000" db: image: postgres:16 @@ -42,5 +42,9 @@ services: start_period: 30s timeout: 10s + worker: + <<: *service-defaults + command: ./script/worker run + volumes: postgres: diff --git a/document-transfer-service.gemspec b/document-transfer-service.gemspec index 7e33536..8f4d145 100644 --- a/document-transfer-service.gemspec +++ b/document-transfer-service.gemspec @@ -21,15 +21,23 @@ Gem::Specification.new do |s| s.required_ruby_version = '>= 3.3' # Add runtime dependencies. + # We don't use activesupport directly, but our dependencies do, and + # factory_bot fails on 7.2 + # https://github.com/thoughtbot/factory_bot/issues/1685 + s.add_dependency 'activesupport', '~> 7.1.0' s.add_dependency 'adal', '~> 1.0' s.add_dependency 'bcrypt', '~> 3.1' + s.add_dependency 'daemons', '~> 1.4' + s.add_dependency 'delayed_job', '~> 4.1' s.add_dependency 'faraday', '~> 2.9' + s.add_dependency 'fugit', '~> 1.11' s.add_dependency 'grape', '~> 2.0' s.add_dependency 'grape-entity', '~> 1.0' s.add_dependency 'grape-swagger', '~> 2.1' s.add_dependency 'grape-swagger-entity', '~> 0.5' s.add_dependency 'httparty', '~> 0.22' s.add_dependency 'opentelemetry-exporter-otlp', '~> 0.27' + s.add_dependency 'opentelemetry-instrumentation-delayed_job', '~> 0.22' s.add_dependency 'opentelemetry-instrumentation-faraday', '~> 0.24' s.add_dependency 'opentelemetry-instrumentation-grape', '~> 0.1' s.add_dependency 'opentelemetry-instrumentation-rack', '~> 0.24' diff --git a/lib/api/middleware/correlation_id.rb b/lib/api/middleware/correlation_id.rb index b977d97..2f95e85 100644 --- a/lib/api/middleware/correlation_id.rb +++ b/lib/api/middleware/correlation_id.rb @@ -14,15 +14,15 @@ class CorrelationId # Initialize the middleware # - # @param [Rack::Events] app The Rack application. + # @param app [Rack::Events] The Rack application. def initialize(app) @app = app end # Ensure that the request has a correlation id. # - # @param [Hash] env The environment hash. - # @return [Array] The response # for the request. def call(env) # Prefer the existing correlation id, if it exists. diff --git a/lib/api/middleware/instrument.rb b/lib/api/middleware/instrument.rb index abd5461..072b596 100644 --- a/lib/api/middleware/instrument.rb +++ b/lib/api/middleware/instrument.rb @@ -5,7 +5,7 @@ require_relative 'correlation_id' require_relative 'request_id' -require_relative '../util/measure' +require_relative '../../util/measure' module DocumentTransfer module API @@ -15,14 +15,14 @@ class Instrument include DocumentTransfer::Util::Measure DEFAULT_TAGS = %W[ - service:document-transfer-service + service:#{DocumentTransfer::NAME} version:#{DocumentTransfer::VERSION} environment:#{ENV.fetch('RACK_ENV', 'development')} ].freeze # Initialize the middleware # - # @param [Rack::Events] app The Rack application. + # @param app [Rack::Events] The Rack application. def initialize(app) @app = app end @@ -34,8 +34,8 @@ def initialize(app) # executed. Additionally, we want to include the HTTP status code in the # tags. # - # @param [Hash] env The environment hash. - # @return [Array] The response # for the request. def call(env) telemetry_attributes(env) @@ -57,7 +57,7 @@ def call(env) # Fetches the name for the endpoint we're instrumenting. # - # @param [Grape::Endpoint] endpoint The Grape endpoint. + # @param endpoint [Grape::Endpoint] The Grape endpoint. # @return [String] def endpoint_name(endpoint) endpoint.options[:route_options][:endpoint_name] || @@ -70,7 +70,7 @@ def endpoint_name(endpoint) # We use the namespace and path of the endpoint, stripping any "/" and # joining the parts with a ".". # - # @param [Grape::Endpoint] endpoint The Grape endpoint. + # @param endpoint [Grape::Endpoint] The Grape endpoint. # @return [String] def build_endpoint_name(endpoint) parts = (endpoint.namespace.split('/') + @@ -81,7 +81,7 @@ def build_endpoint_name(endpoint) # Adds OpenTelemetry attributes to the current span. # - # @param [Hash] env The environment hash. + # @param env [Hash] The environment hash. def telemetry_attributes(env) current_span = OpenTelemetry::Trace.current_span current_span.add_attributes( @@ -92,8 +92,8 @@ def telemetry_attributes(env) # Builds an array of tags to include with our stats. # - # @param [Grape::Endpoint] endpoint The Grape endpoint. - # @param [Integer] status The HTTP status code. + # @param endpoint [Grape::Endpoint] The Grape endpoint. + # @param status [Integer] The HTTP status code. # @return [Array] def tags(endpoint, status: nil) tags = DEFAULT_TAGS + %W[ diff --git a/lib/api/middleware/request_id.rb b/lib/api/middleware/request_id.rb index 4cfeb75..f65a63e 100644 --- a/lib/api/middleware/request_id.rb +++ b/lib/api/middleware/request_id.rb @@ -14,15 +14,15 @@ class RequestId # Initialize the middleware # - # @param [Rack::Events] app The Rack application. + # @param app [Rack::Events] The Rack application. def initialize(app) @app = app end # Ensure that the request has a request id. # - # @param [Hash] env The environment hash. - # @return [Array] The response # for the request. def call(env) # Prefer the existing request id, if it exists. diff --git a/lib/api/middleware/request_logging.rb b/lib/api/middleware/request_logging.rb index fadeab2..b6e7b12 100644 --- a/lib/api/middleware/request_logging.rb +++ b/lib/api/middleware/request_logging.rb @@ -3,7 +3,7 @@ require 'semantic_logger' require_relative 'request_id' -require_relative '../util/measure' +require_relative '../../util/measure' module DocumentTransfer module API @@ -23,15 +23,15 @@ class RequestLogging # Initialize the middleware # - # @param [Rack::Events] app The Rack application. + # @param app [Rack::Events] The Rack application. def initialize(app) @app = app end # Log the request and response details. # - # @param [Hash] env The environment hash. - # @return [Array] The response # for the request. def call(env) response, time = measure { @app.call(env) } @@ -43,14 +43,14 @@ def call(env) # Fetches the content length for the response. # - # @param [Rack::Headers] headers The response headers. + # @param headers [Rack::Headers] The response headers. def content_length(headers) headers[Rack::CONTENT_LENGTH] || '-' end # Fetches the query string from the request. # - # @param [Rack::Request] request The request. + # @param request [Rack::Request] The request. def query_string(request) request.query_string.empty? ? '' : "?#{request.query_string}" end @@ -62,17 +62,17 @@ def query_string(request) # instead ensures we get the request id if it was added by our # middleware. # - # @param [Hash] env The environment hash. + # @param env [Hash] The environment hash. def request_id(env) env.fetch(RequestId::KEY, '-') end # Log the request details. # - # @param [Integer] status The HTTP status code. - # @param [Rack::Headers] headers The response headers. - # @param [Hash] env The environment hash. - # @param [Float] duration The duration of the request. + # @param status [Integer] The HTTP status code. + # @param headers [Rack::Headers] The response headers. + # @param env [Hash] The environment hash. + # @param duration [Float] The duration of the request. def log(status, headers, env, duration) SemanticLogger.push_named_tags(DEFAULT_TAGS) logger.info( @@ -84,10 +84,10 @@ def log(status, headers, env, duration) # Builds the payload for the log message. # - # @param [Integer] status The HTTP status code. - # @param [Rack::Headers] headers The response headers. - # @param [Hash] env The environment hash. - # @param [Float] duration The duration of the request. + # @param status [Integer] The HTTP status code. + # @param headers [Rack::Headers] The response headers. + # @param env [Hash] The environment hash. + # @param duration [Float] The duration of the request. # @return [Hash] def payload(status, headers, env, duration) request = Rack::Request.new(env) diff --git a/lib/bootstrap.rb b/lib/bootstrap.rb new file mode 100644 index 0000000..ae4b25d --- /dev/null +++ b/lib/bootstrap.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +module DocumentTransfer + # Top-level module for the application bootstrap system. + module Bootstrap + # Load all boostrap stages into memory. + # + # This is a convenience method to ensure all stages are loaded. It does not + # execute any of the stages. + def self.load_stages + require_relative 'bootstrap/stage/database' + require_relative 'bootstrap/stage/jobs' + require_relative 'bootstrap/stage/logger' + require_relative 'bootstrap/stage/models' + require_relative 'bootstrap/stage/prompt' + require_relative 'bootstrap/stage/telemetry' + require_relative 'bootstrap/stage/worker' + end + end +end diff --git a/lib/bootstrap/api.rb b/lib/bootstrap/api.rb new file mode 100644 index 0000000..6cdb8c1 --- /dev/null +++ b/lib/bootstrap/api.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +require_relative 'stage/database' +require_relative 'stage/logger' +require_relative 'stage/models' +require_relative 'stage/telemetry' +require_relative 'stage/jobs' + +module DocumentTransfer + module Bootstrap + # Boostrap the API. + class API + def initialize(config) + @config = config + end + + def bootstrap + Stage::Logger.new(@config).bootstrap + Stage::Database.new(@config).bootstrap + Stage::Models.new(@config).bootstrap + Stage::Jobs.new(@config).bootstrap + Stage::Telemetry.new(@config).bootstrap + end + end + end +end diff --git a/lib/bootstrap/console.rb b/lib/bootstrap/console.rb new file mode 100644 index 0000000..22685eb --- /dev/null +++ b/lib/bootstrap/console.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +require_relative 'stage/database' +require_relative 'stage/logger' +require_relative 'stage/jobs' +require_relative 'stage/models' +require_relative 'stage/prompt' +require_relative 'stage/telemetry' + +module DocumentTransfer + module Bootstrap + # Boostrap the console. + class Console + def initialize(config) + @config = config + end + + def bootstrap + Stage::Prompt.new(@config).bootstrap + Stage::Logger.new(@config).bootstrap + Stage::Database.new(@config).bootstrap + Stage::Models.new(@config).bootstrap + Stage::Jobs.new(@config).bootstrap + Stage::Telemetry.new(@config).bootstrap + end + end + end +end diff --git a/lib/bootstrap/rake.rb b/lib/bootstrap/rake.rb new file mode 100644 index 0000000..7d075e5 --- /dev/null +++ b/lib/bootstrap/rake.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true + +require_relative 'stage/database' +require_relative 'stage/jobs' +require_relative 'stage/logger' +require_relative 'stage/rake_tasks' + +module DocumentTransfer + module Bootstrap + # Boostrap the API. + class Rake + include SemanticLogger::Loggable + + def initialize(config) + @config = config + end + + def bootstrap + Stage::Logger.new(@config).bootstrap + + # We may not have a database available when running rake tasks. Try to + # bootstrap it, but don't fail if it's not available. + begin + Stage::Database.new(@config).bootstrap + Stage::Jobs.new(@config).bootstrap + rescue Sequel::DatabaseConnectionError => e + warn("Database connection error: #{e.message}") + end + + Stage::RakeTasks.new(@config).bootstrap + end + end + end +end diff --git a/lib/bootstrap/stage/base.rb b/lib/bootstrap/stage/base.rb new file mode 100644 index 0000000..96ae6c1 --- /dev/null +++ b/lib/bootstrap/stage/base.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +require 'semantic_logger' + +module DocumentTransfer + module Bootstrap + module Stage + # Base class for bootstrap stages. + class Base + include SemanticLogger::Loggable + + attr_reader :config + + # Initialize the bootstrap stage. + # + # @param config [Config] The configuration. + # @return [self] + def initialize(config) + @config = config + end + + # Bootstrap the stage. + # + # @return [void] + # + # @raise [NotImplementedError] If the method is not implemented. + def bootstrap + raise NotImplementedError + end + end + end + end +end diff --git a/lib/bootstrap/stage/database.rb b/lib/bootstrap/stage/database.rb new file mode 100644 index 0000000..badb0f0 --- /dev/null +++ b/lib/bootstrap/stage/database.rb @@ -0,0 +1,72 @@ +# frozen_string_literal: true + +require 'sequel' + +require_relative 'base' + +module DocumentTransfer + module Bootstrap + module Stage + # Bootstrap the database connection. + class Database < Base + MIGRATIONS_PATH = '../../../db/migrations' + + # Make sure we have a database connection and run migrations. + def bootstrap + migrate(connect_with_create) + end + + private + + # Create a database connection. + # + # @return [Sequel::Database] + def connect + Sequel.connect(config.database_credentials) + end + + # Create a database connection and create the database if it does not + # exist. + # + # @return [Sequel::Database] + def connect_with_create + connect + rescue Sequel::DatabaseConnectionError => e + raise unless e.message =~ /database ".+" does not exist/ + + create + connect + end + + # Create the database. + def create + Sequel.connect(config.database_credentials(base: true)) do |db| + db.execute "CREATE DATABASE #{db.literal(Sequel.lit(config.database_name))}" + logger.info('Created database', database: config.database_name) + end + end + + # Run any pending migrations. + # + # This will use an advisory lock on supported databases to prevent + # multiple instances from running migrations at the same time. + # + # @param db [Sequel::Database] The database connection. + # @param version [Integer] The version to migrate to. + # @return [void] + def migrate(db, version: nil) + Sequel.extension :migration + Sequel::Migrator.run(db, 'db/migrations', target: version, + use_advisory_lock: lock?) + end + + # Whether to use an advisory lock when running migrations. + # + # @return [Boolean] + def lock? + config.database_adapter == 'postgresql' + end + end + end + end +end diff --git a/lib/bootstrap/stage/jobs.rb b/lib/bootstrap/stage/jobs.rb new file mode 100644 index 0000000..050471e --- /dev/null +++ b/lib/bootstrap/stage/jobs.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +require 'delayed_job' + +require_relative 'base' +require_relative '../../job' + +module DocumentTransfer + module Bootstrap + module Stage + # Bootstrap background jobs. + class Jobs < Base + # Setup the job queue and schedule recurring jobs. + def bootstrap + configure + Job.schedule + end + + private + + def configure + require_relative '../../delayed/backend/sequel' + Delayed::Worker.backend = Delayed::Backend::Sequel::Job + Delayed::Worker.logger = SemanticLogger['Delayed::Worker'] + Delayed::Worker.destroy_failed_jobs = false + end + end + end + end +end diff --git a/lib/bootstrap/stage/logger.rb b/lib/bootstrap/stage/logger.rb new file mode 100644 index 0000000..b64f663 --- /dev/null +++ b/lib/bootstrap/stage/logger.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +require 'semantic_logger' + +require_relative 'base' +require_relative '../../document_transfer' + +module DocumentTransfer + module Bootstrap + module Stage + # Bootstrap the logger. + class Logger < Base + # Configure the logger for the application. + def bootstrap + SemanticLogger.default_level = @config.log_level + SemanticLogger.application = DocumentTransfer::NAME + SemanticLogger.add_appender(io: $stdout, formatter: :json) + end + end + end + end +end diff --git a/lib/bootstrap/stage/models.rb b/lib/bootstrap/stage/models.rb new file mode 100644 index 0000000..3d6ac02 --- /dev/null +++ b/lib/bootstrap/stage/models.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +require_relative 'base' +require_relative '../../model' + +module DocumentTransfer + module Bootstrap + module Stage + # Bootstrap models. + class Models < Base + # Load all models into memory. + def bootstrap + Model.load + end + end + end + end +end diff --git a/lib/bootstrap/stage/prompt.rb b/lib/bootstrap/stage/prompt.rb new file mode 100644 index 0000000..3d3125d --- /dev/null +++ b/lib/bootstrap/stage/prompt.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +require 'pry' + +require_relative 'base' +require_relative '../../document_transfer' + +module DocumentTransfer + module Bootstrap + module Stage + # Bootstrap the prompt for the console. + class Prompt < Base + COLOR_GREEN = "\e[1;32m" + COLOR_RED = "\e[1;31m" + COLOR_YELLOW = "\e[1;33m" + + # Configure the prompt for the console. + # + # This will set the prompt name and color based on the environment to + # make it easier to identify the environment when using the console. + # + # @return [void] + def bootstrap # rubocop:disable Metrics/AbcSize + Pry.config.prompt_name = "document-transfer(#{@config.environment})" + Pry.config.prompt = Pry::Prompt.new( + :document_transfer, + 'Document transfer console prompt', + [ + proc { |_, _, p| "#{color}[#{p.input_ring.count}] #{p.config.prompt_name} > \e[0m" }, + proc { |_, _, p| "#{color}[#{p.input_ring.count}] #{p.config.prompt_name} * \e[0m" } + ] + ) + end + + private + + # Determine the color to use for the prompt. + # + # Green for development, yellow for staging, and red for production. + # + # @return [String] + # @link https://gist.github.com/JBlond/2fea43a3049b38287e5e9cefc87b2124 + def color + return COLOR_RED if config.prod? + return COLOR_YELLOW if config.prod_like? + + COLOR_GREEN + end + end + end + end +end diff --git a/lib/bootstrap/stage/rake_tasks.rb b/lib/bootstrap/stage/rake_tasks.rb new file mode 100644 index 0000000..fbcf9af --- /dev/null +++ b/lib/bootstrap/stage/rake_tasks.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +require 'delayed_job' + +require_relative 'base' +require_relative '../../document_transfer' + +module DocumentTransfer + module Bootstrap + module Stage + # Bootstrap rake tasks. + class RakeTasks < Base + # Load all custom rake tasks. + def bootstrap + DocumentTransfer.load_rake_tasks + end + end + end + end +end diff --git a/lib/bootstrap/stage/telemetry.rb b/lib/bootstrap/stage/telemetry.rb new file mode 100644 index 0000000..ab429e1 --- /dev/null +++ b/lib/bootstrap/stage/telemetry.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +require 'opentelemetry/sdk' +require 'opentelemetry-exporter-otlp' +require 'opentelemetry/instrumentation/delayed_job' +require 'opentelemetry/instrumentation/faraday' +require 'opentelemetry/instrumentation/grape' +require 'opentelemetry/instrumentation/rack' + +require_relative 'base' +require_relative '../../document_transfer' + +module DocumentTransfer + module Bootstrap + module Stage + # Bootstrap OpenTelemetry. + class Telemetry < Base + # Configure the OpenTelemetry SDK. + def bootstrap + OpenTelemetry::SDK.configure do |c| + c.service_name = DocumentTransfer::NAME + c.service_version = DocumentTransfer::VERSION + c.use_all + end + end + end + end + end +end diff --git a/lib/bootstrap/stage/worker.rb b/lib/bootstrap/stage/worker.rb new file mode 100644 index 0000000..bc00d3b --- /dev/null +++ b/lib/bootstrap/stage/worker.rb @@ -0,0 +1,57 @@ +# frozen_string_literal: true + +require 'statsd-instrument' + +require_relative 'base' + +module DocumentTransfer + module Bootstrap + module Stage + # Bootstrap the worker. + class Worker < Base + DEFAULT_TAGS = %W[ + service:#{DocumentTransfer::NAME} + version:#{DocumentTransfer::VERSION} + environment:#{ENV.fetch('RACK_ENV', 'development')} + ].freeze + + attr_writer :queue + + # Create a separate thread to monitor the queue. + def bootstrap + Thread.abort_on_exception = true + Thread.new do + require_relative '../../job/queue' + + loop do + sleep config.queue_stats_interval + report(queue) + end + end + end + + # The queue to get statistics for. + # + # @return [DocumentTransfer::Job::Queue] + def queue + @queue ||= DocumentTransfer::Job::Queue.new + end + + private + + # Report stats to StatsD. + # + # @param queue [DocumentTransfer::Job::Queue] Queue to report stats for. + # @return [void] + def report(queue) + stats = queue.stats + StatsD.measure('jobs.queue.oldest.age', stats.delete(:oldest), + tags: DEFAULT_TAGS) + stats.each do |stat, value| + StatsD.measure("jobs.queue.size.#{stat}", value, tags: DEFAULT_TAGS) + end + end + end + end + end +end diff --git a/lib/bootstrap/worker.rb b/lib/bootstrap/worker.rb new file mode 100644 index 0000000..bdf1984 --- /dev/null +++ b/lib/bootstrap/worker.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +require_relative 'stage/database' +require_relative 'stage/jobs' +require_relative 'stage/logger' +require_relative 'stage/telemetry' +require_relative 'stage/worker' + +module DocumentTransfer + module Bootstrap + # Boostrap the worker. + class Worker + def initialize(config) + @config = config + end + + def bootstrap + Stage::Logger.new(@config).bootstrap + Stage::Database.new(@config).bootstrap + Stage::Jobs.new(@config).bootstrap + Stage::Telemetry.new(@config).bootstrap + Stage::Worker.new(@config).bootstrap + end + end + end +end diff --git a/lib/config/application.rb b/lib/config/application.rb index 50044cd..0135d85 100644 --- a/lib/config/application.rb +++ b/lib/config/application.rb @@ -15,11 +15,15 @@ class Application < Base option :database_user, type: String, default: 'postgres' option :environment, type: String, default: 'development', env_variable: 'RACK_ENV' + option :log_level, type: String, default: 'info' + option :port, type: Integer, default: 3000, + env_variable: 'RACK_PORT' + option :queue_stats_interval, type: Integer, default: 30 - def database_credentials + def database_credentials(base: false) { adapter: database_adapter, - database: database_name, + database: base ? base_database : database_name, host: database_host, password: database_password, port: database_port, @@ -34,6 +38,10 @@ def prod? def prod_like? %w[demo staging].include?(environment) end + + def test? + %w[testing test].include?(environment) + end end end end diff --git a/lib/delayed/backend/sequel.rb b/lib/delayed/backend/sequel.rb new file mode 100644 index 0000000..70c1292 --- /dev/null +++ b/lib/delayed/backend/sequel.rb @@ -0,0 +1,211 @@ +# frozen_string_literal: true + +require 'delayed_job' +require 'fugit' +require 'fugit/cron' +require 'sequel' + +module Delayed + module Backend + module Sequel + # Delayed Job Sequel Backend. + class Job < ::Sequel::Model # rubocop:disable Metrics/ClassLength + include Delayed::Backend::Base + + alias save! save + alias update_attributes update + + dataset_module do + # Finds jobs that are ready to be run. + # + # @param worker [String] The name of the worker trying to find a job. + # @param max_run_time [Integer] The maximum time a job is allowed to + # run. + # @return [Sequel::Dataset] The dataset of jobs that are ready to be + # run. + def ready_to_run(worker, max_run_time) + db_time_now = model.db_time_now + filter do + ( + ((run_at <= db_time_now) & + ::Sequel.expr(locked_at: nil)) | + (::Sequel.expr(locked_at: ..(db_time_now - max_run_time))) | + { locked_by: worker } + ) & { failed_at: nil } + end + end + + # Orders jobs by priority and their run time. + # + # @return [Sequel::Dataset] The dataset of jobs ordered by priority. + def by_priority + order( + ::Sequel.expr(:priority).asc, + ::Sequel.expr(:run_at).asc + ) + end + end + + # Before saving, make sure the job has been scheduled. + def before_save + set_next_run_at + set_default_run_at + super + end + + # Don't destroy recurring jobs that need to be rescheduled. + def destroy + if !payload_object.respond_to?(:reschedule_instead_of_destroy) || + !(cron && payload_object.reschedule_instead_of_destroy) + return super + end + + schedule_next_run + end + + # Schedules the next run for a recurring job and unlock it. + def schedule_next_run + self.attempts += 1 + unlock + set_next_run_at + save! + end + + # Scheduled non-recurring jobs to run now. + def set_next_run_at + return unless cron + + self.run_at = Fugit::Cron.do_parse(cron).next_time(now).to_local_time + end + + # Gain an exclusive lock on the job so that it doesn't get picked up by + # another worker. + # + # @param max_run_time [Integer] the maximum time a job is allowed to run + # before it is considered dead. + # @param worker [String] the name of the worker that is trying to lock + # this job. + def lock_exclusively!(max_run_time, worker) + locked = regain_lock(worker) || claim_lock(worker, max_run_time) + + reload if locked + locked + end + + class << self + def db_time_now + Time.now + end + + # Database connections don't like being forked. + def before_fork + ::Sequel::DATABASES.each(&:disconnect) + end + + # Reconnect to the database after forking. + def after_fork + ::Sequel::DATABASES.each { |d| d.connect(d.uri) } + end + + # Clear all locks from this worker. + # + # @param worker [String] the name of the worker to clear the locks + # for. + def clear_locks!(worker) + Job.where(locked_by: worker) + .update(locked_at: nil, locked_by: nil) + end + + # Deletes all jobs in the queue. + def delete_all + dataset.delete + end + + # Find jobs that are available to be run. + # + # @param worker [String] The name of the worker trying to find a job. + # @param limit [Integer] The maximum number of jobs to find. + # @param max_run_time [Integer] The maximum time a job is allowed to + # run. + # @return [Sequel::Dataset] The jobs that are available to be run. + def find_available(worker, limit = 5, max_run_time = Worker.max_run_time) + ds = ready_to_run(worker, max_run_time) + ds = ds.where(priority: Worker.min_priority..) if Worker.min_priority + ds = ds.where(priority: ..Worker.max_priority) if Worker.max_priority + ds = ds.where(queue: Worker.queues) if Worker.queues.any? + + ds.limit(limit).by_priority + end + + # Reserves a job for this worker. + # + # @param worker [Delayed::Worker] The worker that is trying to reserve + # a job. + # @param max_run_time [Integer] The maximum time a job is allowed to + # run. + # @return [Job] The job that was reserved. + def reserve(worker, max_run_time = Worker.max_run_time) + lock_with_for_update( + find_available(worker.name, 1, max_run_time), + worker + ) + end + + # Locks an individual job for a worker. + # + # This method uses a database lock to prevent other workers from + # claiming this job. + # + # @param records [Sequel::Dataset] The dataset to lock a job from. + # @param worker [Delayed::Worker] The worker that is trying to lock + # the job. + # @return [Job] The job that was locked. + def lock_with_for_update(records, worker) + records = records.for_update + db.transaction do + job = records.first + if job + job.locked_at = db_time_now + job.locked_by = worker.name + job.save(raise_on_failure: true) + job + end + end + end + end + + private + + def now + @now ||= self.class.db_time_now + end + + # Regain the lock on a job that was locked by a worker that died. + # + # @param worker [String] the name of the worker trying to lock the job. + # @return [Boolean] Whether or not the lock was obtained. + def regain_lock(worker) + return false unless locked_by == worker + + Job.where(id:, locked_by: worker).update(locked_at: now) == 1 + end + + # Claim a lock on the job. + # + # @param worker [String] the name of the worker trying to lock the job. + # @param max_run_time [Integer] the maximum time a job is allowed to run + # before it is considered dead. + # @return [Boolean] Whether or not the lock was obtained. + def claim_lock(worker, max_run_time) + now = self.class.db_time_now + Job.where(id:, run_at: ..now) + .where do + ::Sequel.expr(locked_at: nil) | + ::Sequel.expr(locked_at: ..(now - max_run_time)) + end + .update(locked_at: now, locked_by: worker) == 1 + end + end + end + end +end diff --git a/lib/document_transfer.rb b/lib/document_transfer.rb index efe08e9..e04df4c 100644 --- a/lib/document_transfer.rb +++ b/lib/document_transfer.rb @@ -10,7 +10,9 @@ module DocumentTransfer # Load all of our custom rake tasks. def self.load_rake_tasks require_relative 'rake/database/database' + require_relative 'rake/jobs/jobs' Rake::Database::Database.new + Rake::Jobs::Jobs.new end end diff --git a/lib/job.rb b/lib/job.rb new file mode 100644 index 0000000..91cf11a --- /dev/null +++ b/lib/job.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module DocumentTransfer + # Top-level module for background jobs. + module Job + # Load all jobs into memory so that they don't need to be required + # individually. + def self.load + require_relative 'job/cron/expire_key' + end + + # Ensure that all cron jobs are scheduled to be run. + # + # @param force [Boolean] Reschedules all jobs. + # @return [Integer] The number of jobs scheduled. + def self.schedule(force: false) + self.load + + Cron.constants.select do |klass| + job = Cron.const_get(klass) + next unless job.respond_to?(:reschedule) && job.cron_expression + + force ? job.reschedule : job.schedule + end.length + end + + # Remove any currently scheduled jobs. + # + # @return [Integer] The number of jobs unscheduled. + def self.unschedule + self.load + + Cron.constants.select do |klass| + job = Cron.const_get(klass) + next unless job.respond_to?(:remove) && job.cron_expression + + job.remove + end.length + end + end +end diff --git a/lib/job/base.rb b/lib/job/base.rb new file mode 100644 index 0000000..b3cc97c --- /dev/null +++ b/lib/job/base.rb @@ -0,0 +1,107 @@ +# frozen_string_literal: true + +require 'delayed_job' +require 'statsd-instrument' + +require_relative '../util/measure' + +module DocumentTransfer + module Job + # Base class for background jobs. + # + # @abstract Subclass and override {#perform} to implement a job. + class Base + include Util::Measure + + DEFAULT_TAGS = %W[ + service:#{DocumentTransfer::NAME} + version:#{DocumentTransfer::VERSION} + environment:#{ENV.fetch('RACK_ENV', 'development')} + ].freeze + + attr_writer :logger + + # Initialize the job. + # + # @param payload [Hash] The payload required to process the job. + # @return [self] + def initialize(payload = {}) + @payload = payload + end + + # The logger for the job. + # + # @return [SemanticLogger::Logger] + def logger + @logger ||= SemanticLogger[self.class] + end + + # Queue the job for processing. + def queue + Delayed::Job.enqueue(self) + end + + # Perform the job. + # + # @return [void] + # + # @raise [NotImplementedError] If the method is not implemented. + def perform + raise NotImplementedError + end + + # Callback before the job is enqueued. + # + # Increment the count of jobs queued. + def enqueue + StatsD.increment('jobs.queued.count', tags:) + end + + # Callback before the job is executed. + # + # Record the start time of the job. + def before + @start = clock_time + end + + # Callback after the job is executed. + # + # Record the duration of the job. + def after + StatsD.measure('jobs.completed.duration', clock_time - @start, tags:) + end + + private + + # The name of this job to use for statistics. + # + # By default, we use the class name with the first 2 module names + # ("DocumentTransfer::Job") removed and the remaining parts joined with a + # ".". We then convert the name to lower dash case (e.g. "FooBar" becomes + # "foo-bar"). + # + # @return [String] + def stat_name + self.class.name.split('::')[2..].join('.') + .gsub(/(?<=[a-z])\B(?=[A-Z])/, '-').downcase + end + + # The tags to use for statistics. + # + # @return [Array] + def tags + DEFAULT_TAGS + ["job:#{stat_name}"] + end + + class << self + # Enqueue a new job for processing. + # + # @param payload [Hash] The payload required to process the job. + # @return [DocumentTransfer::Job::Base] The job instance. + def queue(payload = {}) + new(payload).tap(&:queue) + end + end + end + end +end diff --git a/lib/job/cron/base.rb b/lib/job/cron/base.rb new file mode 100644 index 0000000..92b243e --- /dev/null +++ b/lib/job/cron/base.rb @@ -0,0 +1,60 @@ +# frozen_string_literal: true + +require_relative '../base' + +module DocumentTransfer + module Job + module Cron + # Base class for recurring cron jobs. + # + # @abstract Subclass and implement {#perform} to define a cron job. + class Base < Base + class_attribute :cron_expression + + attr_accessor :schedule_instead_of_destroy + + def queue + return if self.class.scheduled? + + Delayed::Job.enqueue(self, cron: cron_expression) + end + + def after + self.schedule_instead_of_destroy = true + end + + class << self + # Schedule the job to run. + def schedule + new.queue unless scheduled? + end + + # Remove the job from the schedule. + def remove + delayed_job.destroy if scheduled? + end + + # Reschedule the job. + def reschedule + remove + schedule + end + + # Whether the job is scheduled. + # + # @return [Boolean] + def scheduled? + !delayed_job.nil? + end + + # Return the first instance of this job in the queue. + # + # @return [Delayed::Job, nil] + def delayed_job + Delayed::Job.where(Sequel.like(:handler, "%ruby/object:#{name}%")).first + end + end + end + end + end +end diff --git a/lib/job/cron/expire_key.rb b/lib/job/cron/expire_key.rb new file mode 100644 index 0000000..fef7d31 --- /dev/null +++ b/lib/job/cron/expire_key.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +require_relative 'base' +require_relative '../../model/auth_key' + +module DocumentTransfer + module Job + module Cron + # Mark authentication keys that have expired as inactive. + class ExpireKey < Base + # Run every day at midnight. + self.cron_expression = '0 0 * * *' + + def perform + count = DocumentTransfer::Model::AuthKey.where( + active: true, expires: ..Time.now + ).update(active: false) + + logger.info("#{count} expired keys have been deactivated.") + end + end + end + end +end diff --git a/lib/job/queue.rb b/lib/job/queue.rb new file mode 100644 index 0000000..533f12e --- /dev/null +++ b/lib/job/queue.rb @@ -0,0 +1,63 @@ +# frozen_string_literal: true + +require 'delayed_job' + +module DocumentTransfer + module Job + # Interface for the job queue. + class Queue + # Get statistics about the job queue. + # + # @return [Hash{Symbol => Integer}] + def stats + { + recurring:, + oldest:, + running:, + total:, + waiting: + } + end + + # Currently scheduled recurring jobs. + # + # @return [Integer] + def recurring + Delayed::Job.exclude(cron: nil).count + end + + # Age, in seconds, of the oldest non-recurring job in the queue. + # + # @return [Float] + def oldest + job = Delayed::Job.where(cron: nil, locked_at: nil).order(:run_at).first + return 0 unless job + + (Time.now - job.run_at).seconds + end + + # Number of jobs currently being worked. + # + # @return [Integer] + def running + Delayed::Job.exclude(locked_at: nil).count + end + + # Total number of jobs in the queue. + # + # @return [Integer] + def total + Delayed::Job.count + end + + # Number of jobs waiting to be worked. + # + # @return [Integer] + def waiting + Delayed::Job.where(locked_at: nil) + .where { Sequel[cron: nil] | Sequel[run_at: ..Time.now] } + .count + end + end + end +end diff --git a/lib/rake/database/create.rb b/lib/rake/database/create.rb index c4c97b2..0fa3d54 100644 --- a/lib/rake/database/create.rb +++ b/lib/rake/database/create.rb @@ -1,5 +1,8 @@ # frozen_string_literal: true +require 'sequel' +require 'pg' + require_relative 'base' module DocumentTransfer diff --git a/lib/rake/database/database.rb b/lib/rake/database/database.rb index 6f2dc97..7e0dc84 100644 --- a/lib/rake/database/database.rb +++ b/lib/rake/database/database.rb @@ -12,7 +12,7 @@ module DocumentTransfer module Rake module Database - # Rake tasks for managing the database. + # Rake namespace for managing the database. class Database < Rake::Base class EnvironmentError < RuntimeError; end diff --git a/lib/rake/database/setup.rb b/lib/rake/database/setup.rb index 67de597..eb388a8 100644 --- a/lib/rake/database/setup.rb +++ b/lib/rake/database/setup.rb @@ -14,7 +14,7 @@ def initialize(name = :setup, *args, &) private def define(args, &task_block) - desc 'Reset the database' + desc 'Setup the database' task(name, *args) do |_, _task_args| ::Rake::Task['db:create'].invoke(task_block) ::Rake::Task['db:migrate'].invoke(task_block) diff --git a/lib/rake/jobs/jobs.rb b/lib/rake/jobs/jobs.rb new file mode 100644 index 0000000..d50af41 --- /dev/null +++ b/lib/rake/jobs/jobs.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +require 'sequel/core' + +require_relative '../base' +require_relative 'queue' +require_relative 'schedule' + +module DocumentTransfer + module Rake + module Jobs + # Rake namespace for managing background jobs. + class Jobs < Rake::Base + def initialize(name = :jobs, *args, &) + super + end + + private + + def define(args, &) + namespace(@name) do + Queue.new(:queue, *args, &) + Schedule.new(:schedule, *args, &) + end + end + end + end + end +end diff --git a/lib/rake/jobs/queue.rb b/lib/rake/jobs/queue.rb new file mode 100644 index 0000000..14f8306 --- /dev/null +++ b/lib/rake/jobs/queue.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +require_relative '../base' +require_relative '../../job/queue' + +module DocumentTransfer + module Rake + module Jobs + # Get information about the job queue. + class Queue < Rake::Base + attr_reader :name + + def initialize(name = :queue, *args, &) + super + end + + private + + def define(args, &) + desc 'Get information about the job queue' + task(name, *args) do |_, _task_args| + queue = DocumentTransfer::Job::Queue.new + + puts queue.stats.to_json + end + end + end + end + end +end diff --git a/lib/rake/jobs/schedule.rb b/lib/rake/jobs/schedule.rb new file mode 100644 index 0000000..ce1ec8c --- /dev/null +++ b/lib/rake/jobs/schedule.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +require_relative '../base' + +module DocumentTransfer + module Rake + module Jobs + # Schedule all recurring jobs. + class Schedule < Rake::Base + attr_reader :name + + def initialize(name = :schedule, *args, &) + super + end + + private + + def define(args, &) + desc 'Schedule recurring jobs' + task(name, *args) do |_, _task_args| + count = DocumentTransfer::Job.schedule + + puts "#{count} jobs scheduled successfully." + end + end + end + end + end +end diff --git a/lib/api/util/measure.rb b/lib/util/measure.rb similarity index 100% rename from lib/api/util/measure.rb rename to lib/util/measure.rb diff --git a/sample.env b/sample.env index 936bb62..5b7db5d 100644 --- a/sample.env +++ b/sample.env @@ -1,5 +1,6 @@ export COVERAGE=1 export RACK_ENV=development +export OTEL_TRACES_EXPORTER=console # Update to match your local database configuration, if necessary. export DATABASE_USER="" diff --git a/script/api b/script/api new file mode 100755 index 0000000..715005c --- /dev/null +++ b/script/api @@ -0,0 +1,14 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require 'bundler/setup' +require 'rackup' + +require_relative '../lib/config/application' + +config = DocumentTransfer::Config::Application.from_environment + +# Configure and start the server. +server = Rackup::Server.new +server.options = server.options.merge(Port: config.port, Host: '0.0.0.0') +server.start diff --git a/script/worker b/script/worker new file mode 100755 index 0000000..e713f9d --- /dev/null +++ b/script/worker @@ -0,0 +1,14 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require 'bundler/setup' +require 'delayed/command' + +require_relative '../lib/bootstrap/worker' +require_relative '../lib/config/application' + +# Bootstrap the worker. +config = DocumentTransfer::Config::Application.from_environment +DocumentTransfer::Bootstrap::Worker.new(config).bootstrap + +Delayed::Command.new(ARGV).daemonize diff --git a/spec/functional/document_transfer/rake/jobs/queue_spec.rb b/spec/functional/document_transfer/rake/jobs/queue_spec.rb new file mode 100644 index 0000000..d1460d3 --- /dev/null +++ b/spec/functional/document_transfer/rake/jobs/queue_spec.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +require 'rake' + +describe 'jobs:queue', type: :functional do + include_context 'with rake' + + before do + create(:job, cron: '* * * * *', run_at: Time.now - 86_400, locked_at: Time.now - 30) + create(:job, run_at: Time.now - 3600) + create(:job, run_at: Time.now - 60) + create(:job, cron: '0 10 * * *') + create(:job, run_at: Time.now + 300) + create(:job, run_at: Time.now - 43_200, locked_at: Time.now - 30) + end + + it 'returns valid json' do + expect(JSON.parse(invoke_task)).to be_a(Hash) + end + + it 'returns statistics about the job queue' do + expect(JSON.parse(invoke_task, symbolize_names: true)).to eq({ + recurring: 2, + oldest: 3600, + running: 2, + total: 6, + waiting: 3 + }) + end +end diff --git a/spec/functional/document_transfer/rake/jobs/schedule_spec.rb b/spec/functional/document_transfer/rake/jobs/schedule_spec.rb new file mode 100644 index 0000000..09780c8 --- /dev/null +++ b/spec/functional/document_transfer/rake/jobs/schedule_spec.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +require 'rake' + +describe 'jobs:schedule', type: :functional do + include_context 'with rake' + + let(:expected_count) { 1 } + + it 'schedules all recurring jobs' do + invoke_task + expect(Delayed::Job.count).to eq(expected_count) + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 73a2e90..f338d03 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -18,6 +18,10 @@ end end +# Any requires for code that we want to measure coverage for should come after +# the SimpleCov.start call. +require_relative '../lib/job' + # Connect to a test database and run migrations. ENV['DATABASE_ADAPTER'] = 'sqlite' ENV['DATABASE_NAME'] = ':memory:' @@ -41,12 +45,19 @@ config.shared_context_metadata_behavior = :apply_to_host_groups config.before do + # Make sure the database is clean before running tests. + Sequel::Migrator.run(db, 'db', target: 0) + Sequel::Migrator.run(db, 'db') + RSPEC_LOGGER.clear allow(SemanticLogger::Logger).to receive(:new).and_return(RSPEC_LOGGER) + + # Unschedule any jobs before running tests. + DocumentTransfer::Job.unschedule end end # Include shared examples and factories. +require_relative 'support/contexts' require_relative 'support/examples' require_relative 'support/factories' -require_relative 'support/contexts' diff --git a/spec/support/contexts/rake.rb b/spec/support/contexts/rake.rb index ad31877..2abbec7 100644 --- a/spec/support/contexts/rake.rb +++ b/spec/support/contexts/rake.rb @@ -13,4 +13,15 @@ DocumentTransfer.load_rake_tasks Rake::Task.define_task(:environment) end + + # Invoke the task and return output. + # + # @return [String] The output of the task. + def invoke_task + output = StringIO.new + $stdout = output + task.invoke + $stdout = STDOUT + output.string + end end diff --git a/spec/support/examples.rb b/spec/support/examples.rb index f0947f6..df41241 100644 --- a/spec/support/examples.rb +++ b/spec/support/examples.rb @@ -1,4 +1,5 @@ # frozen_string_literal: true +require_relative 'examples/bootstrap_stages' require_relative 'examples/instrumented' require_relative 'examples/request_ids' diff --git a/spec/support/examples/bootstrap_stages.rb b/spec/support/examples/bootstrap_stages.rb new file mode 100644 index 0000000..b4e1d89 --- /dev/null +++ b/spec/support/examples/bootstrap_stages.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +require_relative '../../../lib/bootstrap' + +# Verify the expected boostrap stages are run. +RSpec.shared_examples 'bootstrap_stages' do |stages| + subject(:bootstrap) { described_class.new(config) } + + let(:config) { build(:config_application) } + let!(:bootstrap_stages) do + DocumentTransfer::Bootstrap.load_stages + + stages.to_h do |stage| + klass = DocumentTransfer::Bootstrap::Stage.const_get( + stage.to_s.split('_').map(&:capitalize).join + ) + double = instance_double(klass, bootstrap: nil) + allow(klass).to receive(:new).and_return(double) + [stage, double] + end + end + + # The order that stages are loaded is important, so make sure we not only call + # each stage, but that we do so in the expected order. + it 'runs the expected bootstrap stages in order' do + subject.bootstrap + bootstrap_stages.each_value do |double| + expect(double).to have_received(:bootstrap).ordered + end + end +end diff --git a/spec/support/factories.rb b/spec/support/factories.rb index e313945..931054e 100644 --- a/spec/support/factories.rb +++ b/spec/support/factories.rb @@ -5,6 +5,7 @@ include RSpec::Mocks::ExampleMethods end +require_relative 'factories/config/application_factory' require_relative 'factories/config/destination_factory' require_relative 'factories/config/source_factory' @@ -12,6 +13,7 @@ require_relative 'factories/model/auth_key_factory' require_relative 'factories/model/consumer_factory' +require_relative 'factories/model/job_factory' require_relative 'factories/service/one_drive_factory' diff --git a/spec/support/factories/config/application_factory.rb b/spec/support/factories/config/application_factory.rb new file mode 100644 index 0000000..e011c95 --- /dev/null +++ b/spec/support/factories/config/application_factory.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +require_relative '../../../../lib/config/application' + +FactoryBot.define do + factory :config_application, class: DocumentTransfer::Config::Application do + transient do + environment { 'test' } + end + + initialize_with { new(attributes.merge(environment:)) } + end +end diff --git a/spec/support/factories/model/job_factory.rb b/spec/support/factories/model/job_factory.rb new file mode 100644 index 0000000..cd58de6 --- /dev/null +++ b/spec/support/factories/model/job_factory.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +require 'faker' + +require_relative '../../../../lib/delayed/backend/sequel' + +FactoryBot.define do + factory :job, class: Delayed::Backend::Sequel::Job do + id { Faker::Number.number(digits: 3) } + end +end diff --git a/spec/unit/document_transfer/bootstrap/api_spec.rb b/spec/unit/document_transfer/bootstrap/api_spec.rb new file mode 100644 index 0000000..1678c6d --- /dev/null +++ b/spec/unit/document_transfer/bootstrap/api_spec.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +require_relative '../../../../lib/bootstrap/api' + +RSpec.describe DocumentTransfer::Bootstrap::API do + include_examples 'bootstrap_stages', %i[logger database models jobs telemetry] +end diff --git a/spec/unit/document_transfer/bootstrap/console_spec.rb b/spec/unit/document_transfer/bootstrap/console_spec.rb new file mode 100644 index 0000000..8d5e738 --- /dev/null +++ b/spec/unit/document_transfer/bootstrap/console_spec.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +require_relative '../../../../lib/bootstrap/console' + +RSpec.describe DocumentTransfer::Bootstrap::Console do + include_examples 'bootstrap_stages', %i[prompt logger database models jobs telemetry] +end diff --git a/spec/unit/document_transfer/bootstrap/rake_spec.rb b/spec/unit/document_transfer/bootstrap/rake_spec.rb new file mode 100644 index 0000000..035fa36 --- /dev/null +++ b/spec/unit/document_transfer/bootstrap/rake_spec.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +require_relative '../../../../lib/bootstrap/rake' + +RSpec.describe DocumentTransfer::Bootstrap::Rake do + include_examples 'bootstrap_stages', %i[logger database jobs rake_tasks] +end diff --git a/spec/unit/document_transfer/bootstrap/stage/database_spec.rb b/spec/unit/document_transfer/bootstrap/stage/database_spec.rb new file mode 100644 index 0000000..b9559e6 --- /dev/null +++ b/spec/unit/document_transfer/bootstrap/stage/database_spec.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true + +require_relative '../../../../../lib/bootstrap/stage/database' + +# We're using a mock database connection here, rather than a spy, so we can't +# use `.have_received`. +# rubocop:disable RSpec/MessageSpies +RSpec.describe DocumentTransfer::Bootstrap::Stage::Database do + subject(:stage) { described_class.new(config) } + + let(:config) { build(:config_application, base_database: 'rspec', database_name: 'mock') } + let(:db) { Sequel.connect('mock://postgresql') } + + before do + allow(Sequel).to receive(:connect).and_return(db) + allow(db).to receive(:with_advisory_lock) + end + + describe '#bootstrap' do + it 'connects to the database' do + expect(Sequel).to receive(:connect) + stage.bootstrap + end + + it 'runs pending migrations' do + expect(Sequel::Migrator).to receive(:run) + .with(db, 'db/migrations', target: nil, use_advisory_lock: true) + stage.bootstrap + end + + context "when the database doesn't exist" do + before do + allow(Sequel).to receive(:connect).with(config.database_credentials(base: true)).and_yield(db) + + raise_exception = true + allow(Sequel).to receive(:connect).with(config.database_credentials) do + next db unless raise_exception + + raise_exception = false + raise Sequel::DatabaseConnectionError, 'database "mock" does not exist' + end + end + + it 'creates the database' do + expect(db).to receive(:execute).with('CREATE DATABASE mock') + stage.bootstrap + end + end + end +end +# rubocop:enable RSpec/MessageSpies diff --git a/spec/unit/document_transfer/bootstrap/stage/prompt_spec.rb b/spec/unit/document_transfer/bootstrap/stage/prompt_spec.rb new file mode 100644 index 0000000..ba71239 --- /dev/null +++ b/spec/unit/document_transfer/bootstrap/stage/prompt_spec.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true + +require 'statsd-instrument' + +require_relative '../../../../../lib/bootstrap/stage/prompt' + +# We're using a mock database connection here, rather than a spy, so we can't +# use `.have_received`. +RSpec.describe DocumentTransfer::Bootstrap::Stage::Prompt do + include StatsD::Instrument::Matchers + + subject(:stage) { described_class.new(config) } + + let(:config) { build(:config_application, params) } + let(:params) { {} } + + describe '#bootstrap' do + # rubocop:disable RSpec/MessageSpies + it 'sets the prompt name' do + expect(Pry.config).to receive(:prompt_name=).with('document-transfer(test)') + stage.bootstrap + end + # rubocop:enable RSpec/MessageSpies + end + + describe '#color' do + context 'when the environment is production' do + let(:params) { super().merge(environment: 'production') } + + it 'returns red' do + expect(stage.send(:color)).to eq(described_class::COLOR_RED) + end + end + + context 'when the environment is staging' do + let(:params) { super().merge(environment: 'staging') } + + it 'returns red' do + expect(stage.send(:color)).to eq(described_class::COLOR_YELLOW) + end + end + + context 'when the environment is development' do + let(:params) { super().merge(environment: 'development') } + + it 'returns red' do + expect(stage.send(:color)).to eq(described_class::COLOR_GREEN) + end + end + end +end diff --git a/spec/unit/document_transfer/bootstrap/stage/worker_spec.rb b/spec/unit/document_transfer/bootstrap/stage/worker_spec.rb new file mode 100644 index 0000000..33c21a5 --- /dev/null +++ b/spec/unit/document_transfer/bootstrap/stage/worker_spec.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +require 'statsd-instrument' + +require_relative '../../../../../lib/bootstrap/stage/worker' +require_relative '../../../../../lib/job/queue' + +# We're using a mock database connection here, rather than a spy, so we can't +# use `.have_received`. +RSpec.describe DocumentTransfer::Bootstrap::Stage::Worker do + include StatsD::Instrument::Matchers + + subject(:stage) { described_class.new(config) } + + let(:config) { build(:config_application) } + let(:stats) { { recurring: 2, running: 2, oldest: 3600, total: 6, waiting: 3 } } + + describe '#bootstrap' do + # rubocop:disable RSpec/SubjectStub + before do + allow(Thread).to receive(:new).and_yield + stage.queue = instance_double(DocumentTransfer::Job::Queue, stats:) + allow(stage).to receive(:sleep) + allow(stage).to receive(:loop).and_yield + end + # rubocop:enable RSpec/SubjectStub + + it 'reports the age of the oldest job' do + expect { stage.bootstrap }.to \ + trigger_statsd_measure('jobs.queue.oldest.age', value: stats[:oldest]) + end + + it 'reports the number of recurring jobs' do + expect { stage.bootstrap }.to \ + trigger_statsd_measure('jobs.queue.size.recurring', value: stats[:recurring]) + end + + it 'reports the number of running jobs' do + expect { stage.bootstrap }.to trigger_statsd_measure('jobs.queue.size.running') + end + + it 'reports the number of total jobs' do + expect { stage.bootstrap }.to \ + trigger_statsd_measure('jobs.queue.size.total', value: stats[:total]) + end + + it 'reports the number of waiting jobs' do + expect { stage.bootstrap }.to \ + trigger_statsd_measure('jobs.queue.size.waiting', value: stats[:waiting]) + end + end +end diff --git a/spec/unit/document_transfer/bootstrap/worker_spec.rb b/spec/unit/document_transfer/bootstrap/worker_spec.rb new file mode 100644 index 0000000..48e25cd --- /dev/null +++ b/spec/unit/document_transfer/bootstrap/worker_spec.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +require_relative '../../../../lib/bootstrap/worker' + +RSpec.describe DocumentTransfer::Bootstrap::Worker do + include_examples 'bootstrap_stages', %i[logger database jobs telemetry worker] +end diff --git a/spec/unit/document_transfer/delayed/backend/sequel/job_spec.rb b/spec/unit/document_transfer/delayed/backend/sequel/job_spec.rb new file mode 100644 index 0000000..60b049c --- /dev/null +++ b/spec/unit/document_transfer/delayed/backend/sequel/job_spec.rb @@ -0,0 +1,105 @@ +# frozen_string_literal: true + +require_relative '../../../../../../lib/delayed/backend/sequel' + +RSpec.describe Delayed::Backend::Sequel::Job do + subject!(:job) { create(:job, params) } + + let(:params) { {} } + let(:worker) { instance_double(Delayed::Worker, name: 'rspec') } + + describe '.clear_locks!' do + let(:params) { super().merge(locked_by: 'rspec', locked_at: Time.now) } + + it 'clears all locks' do + expect { described_class.clear_locks!('rspec') }.to \ + change { job.refresh.locked_by }.to(nil).and \ + change { job.refresh.locked_at }.to(nil) + end + end + + describe '.reserve' do + it 'reserves the first job available' do + expect { described_class.reserve(worker, 300) }.to \ + change { job.refresh.locked_by }.to('rspec').and(change { job.refresh.locked_at }) + end + end + + describe '#destroy' do + let(:params) do + super().merge(cron: '* * * * *') + end + + before do + job.payload_object = double + end + + context 'when the job should be rescheduled' do + before do + job.run_at = Time.now - 86_400 + job.payload_object = Struct.new(:reschedule_instead_of_destroy).new(true) + end + + it 'does not destroy the job' do + expect { job.destroy }.not_to change(described_class, :count) + end + + it 'reschedules the job' do + expect { job.destroy }.to change(job, :run_at) + end + end + + it 'destroys the job' do + expect { job.destroy }.to change(described_class, :count).by(-1) + end + end + + describe '#schedule_next_run' do + let(:params) do + super().merge( + cron: '* * * * *', + locked_by: 'rspec', + locked_at: Time.now + ) + end + + it 'increases the number of attempts' do + expect { job.schedule_next_run }.to change(job, :attempts).by(1) + end + + it 'sets the run_at time' do + job.run_at = Time.now - 86_400 + expect { job.schedule_next_run }.to change(job, :run_at) + end + + it 'unlocks the job' do + expect { job.schedule_next_run }.to \ + change(job, :locked_by).to(nil).and change(job, :locked_at).to(nil) + end + end + + describe '#lock_exclusively!' do + context 'when the job is not locked' do + it 'locks the job' do + expect { job.lock_exclusively!(300, 'rspec') }.to \ + change(job, :locked_by).to('rspec').and change(job, :locked_at) + end + end + + context 'when the job is locked by another worker' do + let(:params) { super().merge(locked_by: 'other', locked_at: Time.now) } + + it 'does not lock the job' do + expect { job.lock_exclusively!(300, 'rspec') }.not_to change(job, :locked_by) + end + end + + context 'when the job is locked by the current worker' do + let(:params) { super().merge(locked_by: 'rspec', locked_at: Time.now - 300) } + + it 'regains the lock' do + expect { job.lock_exclusively!(300, 'rspec') }.to change(job, :locked_at) + end + end + end +end diff --git a/spec/unit/document_transfer/job/cron/base_spec.rb b/spec/unit/document_transfer/job/cron/base_spec.rb new file mode 100644 index 0000000..3b82c19 --- /dev/null +++ b/spec/unit/document_transfer/job/cron/base_spec.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +require_relative '../../../../../lib/job/cron/base' + +RSpec.describe DocumentTransfer::Job::Cron::Base do + subject(:job) do + Class.new(described_class) do + self.cron_expression = '* * * * *' + + def perform + true + end + end + end + + let(:expected_time) do + now = DateTime.now + Time.new(now.year, now.month, now.day, now.hour, now.min + 1) + end + + before do + stub_const('DocumentTransfer::Job::Cron::Test', job) + DocumentTransfer::Job.unschedule + end + + describe '.schedule' do + it 'schedules the job' do + pp(job.scheduled?) + expect(job.schedule.run_at).to be_a(Time) + end + + it 'sets the job to run at the next scheduled time' do + pp(job.scheduled?) + expect(job.schedule.run_at).to eq(expected_time) + end + + it 'does not reschedule the job' do + job.schedule + expect(job.schedule).to be_nil + end + end + + describe '.remove' do + it 'unschedules the job' do + record = job.schedule + expect(job.remove).to eq(record) + end + + it 'does not error if the job is not scheduled' do + expect(job.remove).to be_nil + end + end + + describe '.reschedule' do + it 'reschedules the job' do + record = job.schedule + expect(job.reschedule.id).not_to eq(record.id) + end + end + + describe '.scheduled?' do + it 'returns true if the job is scheduled' do + job.schedule + expect(job.scheduled?).to be(true) + end + + it 'returns false if the job is not scheduled' do + expect(job.scheduled?).to be(false) + end + end + + describe 'delayed_job' do + it 'returns the delayed job' do + record = job.schedule + expect(job.delayed_job).to eq(record) + end + + it 'returns no job if nothing is scheduled' do + expect(job.delayed_job).to be_nil + end + end + + describe '#after' do + it 'sets the schedule_instead_of_destroy flag' do + record = job.new + record.after + expect(record.schedule_instead_of_destroy).to be(true) + end + end + + describe '#queue' do + it 'schedules the job' do + expect(job.new.queue.run_at).to eq(expected_time) + end + end +end diff --git a/spec/unit/document_transfer/job/cron/expire_key_spec.rb b/spec/unit/document_transfer/job/cron/expire_key_spec.rb new file mode 100644 index 0000000..89d8faf --- /dev/null +++ b/spec/unit/document_transfer/job/cron/expire_key_spec.rb @@ -0,0 +1,45 @@ +# frozen_string_literal: true + +require 'fugit' +require 'fugit/cron' + +require_relative '../../../../../lib/job/cron/expire_key' + +RSpec.describe DocumentTransfer::Job::Cron::ExpireKey do + subject(:job) { described_class.new } + + let!(:auth_keys) do + { + valid: create(:auth_key), + expired: create(:auth_key, expires: Time.now - 1), + inactive: create(:auth_key, expires: Time.now - 1, active: false) + } + end + + describe '.cron_expression' do + it 'contains a valid cron expression' do + expect { Fugit::Cron.do_parse(described_class.cron_expression) }.not_to \ + raise_error + end + end + + describe '#perform' do + it 'deactivates expired active keys' do + job.perform + expect(auth_keys[:expired].reload.active?).to be(false) + end + + it 'does not deactivate valid keys' do + job.perform + expect(auth_keys[:valid].reload.active?).to be(true) + end + + # We're not using a spy here, so we can't use `have_received`. + # rubocop:disable RSpec/MessageSpies + it 'does not deactivate inactive keys' do + expect(auth_keys[:expired]).not_to receive(:update) + job.perform + end + # rubocop:enable RSpec/MessageSpies + end +end diff --git a/spec/unit/document_transfer/job/queue_spec.rb b/spec/unit/document_transfer/job/queue_spec.rb new file mode 100644 index 0000000..21e3b7f --- /dev/null +++ b/spec/unit/document_transfer/job/queue_spec.rb @@ -0,0 +1,63 @@ +# frozen_string_literal: true + +require_relative '../../../../lib/job/queue' + +RSpec.describe DocumentTransfer::Job::Queue do + subject(:queue) { described_class.new } + + before do + create(:job, cron: '* * * * *', run_at: Time.now - 86_400, locked_at: Time.now - 30) + create(:job, run_at: Time.now - 3600) + create(:job, run_at: Time.now - 60) + create(:job, cron: '0 10 * * *') + create(:job, run_at: Time.now + 300) + create(:job, run_at: Time.now - 43_200, locked_at: Time.now - 30) + end + + describe '#stats' do + it 'returns statistics about the job queue' do + expect(queue.stats).to include( + recurring: 2, + running: 2, + total: 6, + waiting: 3 + ) + end + + # Comparing time values is tricky when working with sub-second precision. + # Convert the age to an integer to perform seconds-based comparisons. + it 'includes the age of the oldest non-recurring job in waiting' do + expect(queue.stats[:oldest].to_i).to eq(3600) + end + end + + describe '#recurring' do + it 'returns the number of jobs with cron schedules' do + expect(queue.recurring).to eq(2) + end + end + + describe '#oldest' do + it 'returns the age of the oldest non-recurring job in waiting' do + expect(queue.oldest.to_i).to eq(3600) + end + end + + describe '#running' do + it 'returns the number of jobs currently being worked' do + expect(queue.running).to eq(2) + end + end + + describe '#total' do + it 'returns the total number of jobs in the queue' do + expect(queue.total).to eq(6) + end + end + + describe '#waiting' do + it 'returns the number of jobs waiting to be worked' do + expect(queue.waiting).to eq(3) + end + end +end diff --git a/spec/unit/document_transfer/job_spec.rb b/spec/unit/document_transfer/job_spec.rb new file mode 100644 index 0000000..a766bc8 --- /dev/null +++ b/spec/unit/document_transfer/job_spec.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +require_relative '../../../lib/job' + +RSpec.describe DocumentTransfer::Job do + let(:recurring_jobs) do + ['ExpireKey'] + end + + describe '.schedule' do + it 'schedules all recurring jobs' do + described_class.schedule + recurring_jobs.each do |job| + klass = DocumentTransfer::Job::Cron.const_get(job) + expect(klass.scheduled?).to be(true) + end + end + end + + describe '.unschedule' do + it 'unschedules all recurring jobs' do + described_class.unschedule + recurring_jobs.each do |job| + klass = DocumentTransfer::Job::Cron.const_get(job) + expect(klass.scheduled?).to be(false) + end + end + end +end