From ff7cad34f3b10fcd0be61180486ca8a63222797a Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Thu, 22 Aug 2024 17:14:37 -0300 Subject: [PATCH] feat: Add async_update_lazy_attributes command to push lazy attr related jobs (#4) --- lib/esse/async_indexing.rb | 7 + lib/esse/async_indexing/cli.rb | 14 ++ lib/esse/async_indexing/cli/async_import.rb | 26 ++- .../cli/async_update_lazy_attributes.rb | 82 +++++++ .../async_indexing/jobs/import_ids_job.rb | 16 +- lib/esse/plugins/async_indexing.rb | 9 +- .../async_indexing/cli/async_import_spec.rb | 12 +- .../cli/async_update_lazy_attributes_spec.rb | 209 ++++++++++++++++++ .../jobs/import_ids_job_spec.rb | 1 + spec/esse/async_indexing_spec.rb | 26 +++ .../async_indexing/async_indexing_job_spec.rb | 2 +- 11 files changed, 380 insertions(+), 24 deletions(-) create mode 100644 lib/esse/async_indexing/cli/async_update_lazy_attributes.rb create mode 100644 spec/esse/async_indexing/cli/async_update_lazy_attributes_spec.rb diff --git a/lib/esse/async_indexing.rb b/lib/esse/async_indexing.rb index f39ad1f..7f48bf2 100755 --- a/lib/esse/async_indexing.rb +++ b/lib/esse/async_indexing.rb @@ -50,6 +50,13 @@ def self.async_indexing_repo?(repo) repo.respond_to?(:implement_batch_ids?) && repo.implement_batch_ids? end + + def self.plugin_installed?(index) + index = index.index if index.is_a?(Class) && index < Esse::Repository + return false unless index.is_a?(Class) && index < Esse::Index + + index.plugins.include?(Esse::Plugins::AsyncIndexing) + end end Esse::Config.__send__ :include, Esse::AsyncIndexing::Config diff --git a/lib/esse/async_indexing/cli.rb b/lib/esse/async_indexing/cli.rb index 71f41df..924d807 100644 --- a/lib/esse/async_indexing/cli.rb +++ b/lib/esse/async_indexing/cli.rb @@ -29,4 +29,18 @@ def async_import(*index_classes) require "esse/async_indexing/cli/async_import" Esse::AsyncIndexing::CLI::AsyncImport.new(indices: index_classes, **opts).run end + + desc "async_update_lazy_attributes INDEX_CLASS", "Async update lazy attributes for the given index" + option :repo, type: :string, default: nil, alias: "-r", desc: "Repository to use for import" + option :suffix, type: :string, default: nil, aliases: "-s", desc: "Suffix to append to index name" + option :context, type: :hash, default: {}, required: true, desc: "List of options to pass to the index class" + option :service, type: :string, default: nil, alias: "-s", desc: "Service to use for async import: sidekiq, faktory" + option :job_options, type: :hash, default: {}, desc: "List of options to pass to the background job. (Example: --job-options=queue:default)" + def async_update_lazy_attributes(index_class, *attributes) + opts = Esse::HashUtils.deep_transform_keys(options.to_h, &:to_sym) + opts[:service] ||= Esse.config.async_indexing.services.first + require "esse/async_indexing/cli/async_update_lazy_attributes" + + Esse::AsyncIndexing::CLI::AsyncUpdateLazyAttributes.new(indices: [index_class], attributes: attributes, **opts).run + end end diff --git a/lib/esse/async_indexing/cli/async_import.rb b/lib/esse/async_indexing/cli/async_import.rb index e9e2781..a314e91 100644 --- a/lib/esse/async_indexing/cli/async_import.rb +++ b/lib/esse/async_indexing/cli/async_import.rb @@ -5,9 +5,23 @@ class Esse::AsyncIndexing::CLI::AsyncImport < Esse::CLI::Index::BaseOperation WORKER_NAME = "Esse::AsyncIndexing::Jobs::ImportIdsJob" + attr_reader :job_options, :service_name + + def initialize(indices:, job_options: {}, service: nil, **options) + @job_options = job_options + @service_name = (service || Esse.config.async_indexing.services.first)&.to_sym + super(indices: indices, **options) + end + def run validate_options! indices.each do |index| + unless Esse::AsyncIndexing.plugin_installed?(index) + raise Esse::CLI::InvalidOption, <<~MSG + The #{index} index does not support async indexing. Make sure you have the `plugin :async_indexing` in your `#{index}` class. + MSG + end + repos = if (repo = @options[:repo]) [index.repo(repo)] else @@ -17,7 +31,7 @@ def run repos.each do |repo| unless Esse::AsyncIndexing.async_indexing_repo?(repo) raise Esse::CLI::InvalidOption, <<~MSG - The #{repo} repository does not support async indexing. Make sure you have the `plugin :async_indexing` in your `#{index}` class and the :#{repo.repo_name} collection implements the `#each_batch_ids` method. + The #{repo} repository does not support async indexing. Make sure the :#{repo.repo_name} collection of `#{index}` implements the `#each_batch_ids` method. MSG end @@ -42,21 +56,13 @@ def run def bulk_options @bulk_options ||= begin - hash = @options.slice(*@options.keys - Esse::CLI_IGNORE_OPTS - [:repo, :service, :job_options]) + hash = @options.slice(*@options.keys - Esse::CLI_IGNORE_OPTS - [:repo]) hash.delete(:context) if hash[:context].nil? || hash[:context].empty? hash end end - def job_options - @job_options ||= @options[:job_options] || {} - end - def validate_options! validate_indices_option! end - - def service_name - (@options[:service] || Esse.config.async_indexing.services.first)&.to_sym - end end diff --git a/lib/esse/async_indexing/cli/async_update_lazy_attributes.rb b/lib/esse/async_indexing/cli/async_update_lazy_attributes.rb new file mode 100644 index 0000000..add3322 --- /dev/null +++ b/lib/esse/async_indexing/cli/async_update_lazy_attributes.rb @@ -0,0 +1,82 @@ +# frozen_string_literal: true + +require "esse/cli/index/base_operation" + +class Esse::AsyncIndexing::CLI::AsyncUpdateLazyAttributes < Esse::CLI::Index::BaseOperation + WORKER_NAME = "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" + + attr_reader :attributes, :job_options, :service_name + + def initialize(indices:, attributes: nil, job_options: nil, service: nil, **options) + super(indices: indices, **options) + @attributes = Array(attributes) + @job_options = job_options || {} + @service_name = (service || Esse.config.async_indexing.services.first)&.to_sym + end + + def run + validate_options! + indices.each do |index| + unless Esse::AsyncIndexing.plugin_installed?(index) + raise Esse::CLI::InvalidOption, <<~MSG + The #{index} index does not support async indexing. Make sure you have the `plugin :async_indexing` in your `#{index}` class. + MSG + end + + repos = if (repo = @options[:repo]) + [index.repo(repo)] + else + index.repo_hash.values + end + + repos.each do |repo| + unless Esse::AsyncIndexing.async_indexing_repo?(repo) + raise Esse::CLI::InvalidOption, <<~MSG + The #{repo} repository does not support async indexing. Make sure the :#{repo.repo_name} collection of `#{index}` implements the `#each_batch_ids` method. + MSG + end + + attrs = repo_attributes(repo) + next unless attrs.any? + + enqueuer = if (caller = repo.async_indexing_jobs[:update_lazy_attribute]) + ->(ids) do + attrs.each do |attribute| + caller.call(service: service_name, repo: repo, operation: :update_lazy_attribute, attribute: attribute, ids: ids, **bulk_options) + end + end + else + ->(ids) do + attrs.each do |attribute| + BackgroundJob.job(service_name, WORKER_NAME, **job_options) + .with_args(repo.index.name, repo.repo_name, attribute.to_s, ids, Esse::HashUtils.deep_transform_keys(bulk_options, &:to_s)) + .push + end + end + end + + repo.batch_ids(**bulk_options.fetch(:context, {})).each(&enqueuer) + end + end + end + + private + + def bulk_options + @bulk_options ||= begin + hash = @options.slice(*@options.keys - Esse::CLI_IGNORE_OPTS - [:repo]) + hash.delete(:context) if hash[:context].nil? || hash[:context].empty? + hash + end + end + + def validate_options! + validate_indices_option! + end + + def repo_attributes(repo) + return repo.lazy_document_attributes.keys if attributes.empty? + + repo.lazy_document_attribute_names(attributes) + end +end diff --git a/lib/esse/async_indexing/jobs/import_ids_job.rb b/lib/esse/async_indexing/jobs/import_ids_job.rb index 76fd869..f07e247 100644 --- a/lib/esse/async_indexing/jobs/import_ids_job.rb +++ b/lib/esse/async_indexing/jobs/import_ids_job.rb @@ -1,8 +1,6 @@ # frozen_string_literal: true class Esse::AsyncIndexing::Jobs::ImportIdsJob - LAZY_ATTR_WORKER = "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" - def perform(index_class_name, repo_name, ids, options = {}) # This is specific to the AsyncIndexing plugin, can't pass to Esse import method enqueue_lazy = options.delete(:enqueue_lazy_attributes) if options.key?(:enqueue_lazy_attributes) @@ -16,12 +14,18 @@ def perform(index_class_name, repo_name, ids, options = {}) return total if lazy_already_imported?(options) return total unless self.class.respond_to?(:background_job_service) - _index_class, repo_class = Esse::AsyncIndexing::Actions::CoerceIndexRepository.call(index_class_name, repo_name) + index_class, repo_class = Esse::AsyncIndexing::Actions::CoerceIndexRepository.call(index_class_name, repo_name) + return total unless Esse::AsyncIndexing.plugin_installed?(index_class) repo_class.lazy_document_attributes.each_key do |attr_name| - BackgroundJob.job(self.class.background_job_service, LAZY_ATTR_WORKER) - .with_args(index_class_name, repo_name, attr_name.to_s, ids, options) - .push + repo_class.async_indexing_job_for(:update_lazy_attribute).call( + **options, + service: self.class.background_job_service, + repo: repo_class, + operation: :update_lazy_attribute, + attribute: attr_name, + ids: ids + ) end total end diff --git a/lib/esse/plugins/async_indexing.rb b/lib/esse/plugins/async_indexing.rb index 0cdae84..95b0a0a 100644 --- a/lib/esse/plugins/async_indexing.rb +++ b/lib/esse/plugins/async_indexing.rb @@ -32,6 +32,13 @@ module RepositoryClassMethods .with_args(repo.index.name, repo.repo_name, id, Esse::HashUtils.deep_transform_keys(kwargs, &:to_s)) .push end + }, + update_lazy_attribute: ->(service:, repo:, operation:, attribute:, ids:, **kwargs) { + unless (ids = Esse::ArrayUtils.wrap(ids)).empty? + BackgroundJob.job(service, "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob") + .with_args(repo.index.name, repo.repo_name, attribute.to_s, ids, Esse::HashUtils.deep_transform_keys(kwargs, &:to_s)) + .push + end } }.freeze @@ -93,7 +100,7 @@ def async_indexing_job_for(operation) end class AsyncIndexingJobValidator - OPERATIONS = %i[import index update delete].freeze + OPERATIONS = %i[import index update delete update_lazy_attribute].freeze def self.call(operations, block) unless block.is_a?(Proc) diff --git a/spec/esse/async_indexing/cli/async_import_spec.rb b/spec/esse/async_indexing/cli/async_import_spec.rb index fced0c2..01ecd31 100644 --- a/spec/esse/async_indexing/cli/async_import_spec.rb +++ b/spec/esse/async_indexing/cli/async_import_spec.rb @@ -59,7 +59,7 @@ def each_batch_ids it "raises an error if the repository does not have the async_indexing plugin" do expect { cli_exec(%w[index async_import CountiesIndex]) - }.to raise_error(Esse::CLI::InvalidOption, /The CountiesIndex::County repository does not support async indexing/) + }.to raise_error(Esse::CLI::InvalidOption, /The CountiesIndex index does not support async indexing. Make sure you have/) end end @@ -103,7 +103,7 @@ def each_batch_ids it "allows --update-lazy-attributes as a single value" do Esse.config.async_indexing.faktory cli_exec(%w[index async_import CitiesIndex --update-lazy-attributes=foo]) - expect{ "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("CitiesIndex", "city", [1, 2, 3], "update_lazy_attributes" => ["foo"]).on(:faktory) + expect { "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("CitiesIndex", "city", [1, 2, 3], "update_lazy_attributes" => ["foo"]).on(:faktory) end it "allows --update-lazy-attributes as multiple comma separated values" do @@ -195,14 +195,14 @@ def each_batch_ids it "enqueues the faktory job for the given index when passing --service=faktory" do cli_exec(%w[index async_import GeosIndex --service=faktory]) - expect{ "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("GeosIndex", "country", [1, 2, 3], {}).on(:faktory) - expect{ "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("GeosIndex", "city", [1, 2, 3], {}).on(:faktory) + expect { "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("GeosIndex", "country", [1, 2, 3], {}).on(:faktory) + expect { "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("GeosIndex", "city", [1, 2, 3], {}).on(:faktory) end it "enqueues the faktory job for the given index when passing --service=sidekiq" do cli_exec(%w[index async_import GeosIndex --service=sidekiq]) - expect{ "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("GeosIndex", "country", [1, 2, 3], {}).on(:sidekiq) - expect{ "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("GeosIndex", "city", [1, 2, 3], {}).on(:sidekiq) + expect { "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("GeosIndex", "country", [1, 2, 3], {}).on(:sidekiq) + expect { "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("GeosIndex", "city", [1, 2, 3], {}).on(:sidekiq) end end diff --git a/spec/esse/async_indexing/cli/async_update_lazy_attributes_spec.rb b/spec/esse/async_indexing/cli/async_update_lazy_attributes_spec.rb new file mode 100644 index 0000000..9b50b94 --- /dev/null +++ b/spec/esse/async_indexing/cli/async_update_lazy_attributes_spec.rb @@ -0,0 +1,209 @@ +# frozen_string_literal: true + +require "spec_helper" +require "support/cli_helpers" +require "esse/cli" +require "esse/async_indexing/cli/async_update_lazy_attributes" + +# rubocop:disable RSpec/ExpectActual +# rubocop:disable RSpec/AnyInstance +RSpec.describe "Esse::CLI::Index", type: :cli do + describe "#async_update_lazy_attributes" do + let(:index_collection_class) do + Class.new(Esse::Collection) do + def each_batch_ids + yield([1, 2, 3]) + end + end + end + + before do + Esse.config.async_indexing.faktory + Esse.config.async_indexing.sidekiq + end + + after do + reset_config! + end + + context "when passing undefined or invalid index name" do + it "raises an error if given argument is not a valid index class" do + expect { + cli_exec(%w[index async_update_lazy_attributes Esse::Config]) + }.to raise_error(Esse::CLI::InvalidOption, /Esse::Config must be a subclass of Esse::Index/) + end + + it "raises an error if given argument is not defined" do + expect { + cli_exec(%w[index async_update_lazy_attributes NotDefinedIndexName]) + }.to raise_error(Esse::CLI::InvalidOption, /Unrecognized index class: "NotDefinedIndexName"/) + end + end + + context "when passing an index that does not support async indexing" do + before do + collection_class = index_collection_class + stub_esse_index(:counties) do + repository :county do + collection collection_class + end + end + end + + it "raises an error if the repository does not have the async_indexing plugin" do + expect { + cli_exec(%w[index async_update_lazy_attributes CountiesIndex]) + }.to raise_error(Esse::CLI::InvalidOption, /The CountiesIndex index does not support async indexing. Make sure you have/) + end + end + + context "when passing a index with single repository that supports async indexing" do + before do + collection_class = index_collection_class + stub_esse_index(:cities) do + plugin :async_indexing + repository :city do + collection collection_class + lazy_document_attribute :total_events do |docs| + docs.map { |doc| [doc.id, 10] }.to_h + end + lazy_document_attribute :total_venues do |docs| + docs.map { |doc| [doc.id, 20] }.to_h + end + end + end + end + + it "enqueues the faktory job for the given index when passing --service=faktory" do + cli_exec(%w[index async_update_lazy_attributes CitiesIndex --service=faktory]) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.to have_enqueued_background_job("CitiesIndex", "city", "total_events", [1, 2, 3], {}).on(:faktory) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.to have_enqueued_background_job("CitiesIndex", "city", "total_venues", [1, 2, 3], {}).on(:faktory) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.not_to have_enqueued_background_job.on(:sidekiq) + end + + it "detects faktory as the default service name when not passed and is set in the configuration" do + Esse.config.async_indexing.faktory + cli_exec(%w[index async_update_lazy_attributes CitiesIndex]) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.to have_enqueued_background_job("CitiesIndex", "city", "total_events", [1, 2, 3], {}).on(:faktory) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.to have_enqueued_background_job("CitiesIndex", "city", "total_venues", [1, 2, 3], {}).on(:faktory) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.not_to have_enqueued_background_job.on(:sidekiq) + end + + it "enqueues the faktory job for the given index when passing --service=sidekiq" do + cli_exec(%w[index async_update_lazy_attributes CitiesIndex --service=sidekiq]) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.to have_enqueued_background_job("CitiesIndex", "city", "total_events", [1, 2, 3], {}).on(:sidekiq) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.to have_enqueued_background_job("CitiesIndex", "city", "total_venues", [1, 2, 3], {}).on(:sidekiq) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.not_to have_enqueued_background_job.on(:faktory) + end + + it "detects sidekiq as the default service name when not passed and is set in the configuration" do + Esse.config.async_indexing.sidekiq + cli_exec(%w[index async_update_lazy_attributes CitiesIndex]) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.to have_enqueued_background_job("CitiesIndex", "city", "total_events", [1, 2, 3], {}).on(:sidekiq) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.to have_enqueued_background_job("CitiesIndex", "city", "total_venues", [1, 2, 3], {}).on(:sidekiq) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.not_to have_enqueued_background_job.on(:faktory) + end + + it "enqueues only the specified lazy attribute job when the attribute is passed" do + cli_exec(%w[index async_update_lazy_attributes CitiesIndex total_events --service=faktory]) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.to have_enqueued_background_job("CitiesIndex", "city", "total_events", [1, 2, 3], {}).on(:faktory) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.not_to have_enqueued_background_job("CitiesIndex", "city", "total_venues", [1, 2, 3], {}).on(:faktory) + end + + it "removes invalid given attributes" do + cli_exec(%w[index async_update_lazy_attributes CitiesIndex total_venues invalid_attribute --service=faktory]) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.to have_enqueued_background_job("CitiesIndex", "city", "total_venues", [1, 2, 3], {}).on(:faktory) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.not_to have_enqueued_background_job("CitiesIndex", "city", "invalid_attribute", [1, 2, 3], {}).on(:faktory) + end + + it "does not enqueue when no valid lazy attributes are passed" do + cli_exec(%w[index async_update_lazy_attributes CitiesIndex invalid_attribute --service=faktory]) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.not_to have_enqueued_background_job.on(:faktory) + end + + it "allows --job-options with a Hash" do + Esse.config.async_indexing.faktory + cli_exec(%w[index async_update_lazy_attributes CitiesIndex --job-options=queue:bar]) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.to have_enqueued_background_job("CitiesIndex", "city", "total_events", [1, 2, 3], {}).on(:faktory).queue("bar") + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.to have_enqueued_background_job("CitiesIndex", "city", "total_venues", [1, 2, 3], {}).on(:faktory).queue("bar") + end + end + + context "when passing a index with multiple repositories that support async indexing" do + before do + collection_class = index_collection_class + stub_esse_index(:geos) do + plugin :async_indexing + repository :country do + collection collection_class + lazy_document_attribute :total_events do |docs| + docs.map { |doc| [doc.id, 10] }.to_h + end + end + repository :city do + collection collection_class + lazy_document_attribute :total_events do |docs| + docs.map { |doc| [doc.id, 10] }.to_h + end + end + end + end + + it "enqueues the faktory job for the given index when passing --service=faktory" do + cli_exec(%w[index async_update_lazy_attributes GeosIndex --service=faktory]) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.to have_enqueued_background_job("GeosIndex", "country", "total_events", [1, 2, 3], {}).on(:faktory) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.to have_enqueued_background_job("GeosIndex", "city", "total_events", [1, 2, 3], {}).on(:faktory) + end + + it "enqueues the faktory job for the given index when passing --service=sidekiq" do + cli_exec(%w[index async_update_lazy_attributes GeosIndex --service=sidekiq]) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.to have_enqueued_background_job("GeosIndex", "country", "total_events", [1, 2, 3], {}).on(:sidekiq) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.to have_enqueued_background_job("GeosIndex", "city", "total_events", [1, 2, 3], {}).on(:sidekiq) + end + end + + context "when passing a index with a custom indexing job defined" do + before do + collection_class = index_collection_class + stub_esse_index(:geos) do + plugin :async_indexing + repository :country do + collection collection_class + lazy_document_attribute :total_events do |docs| + docs.map { |doc| [doc.id, 10] }.to_h + end + async_indexing_job(:update_lazy_attribute) do |**options| + Thread.current[:custom_job] = options + end + end + end + end + + it "enqueues the custom job for the given index when passing --service=faktory" do + cli_exec(%w[index async_update_lazy_attributes GeosIndex --service=faktory]) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.not_to have_enqueued_background_job + expect(Thread.current[:custom_job]).to eq( + service: :faktory, + repo: GeosIndex::Country, + operation: :update_lazy_attribute, + attribute: :total_events, + ids: [1, 2, 3] + ) + end + + it "enqueues the custom job for the given index when passing --service=sidekiq" do + cli_exec(%w[index async_update_lazy_attributes GeosIndex --service=sidekiq]) + expect { "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob" }.not_to have_enqueued_background_job + expect(Thread.current[:custom_job]).to eq( + service: :sidekiq, + repo: GeosIndex::Country, + operation: :update_lazy_attribute, + attribute: :total_events, + ids: [1, 2, 3] + ) + end + end + end +end +# rubocop:enable RSpec/ExpectActual +# rubocop:enable RSpec/AnyInstance diff --git a/spec/esse/async_indexing/jobs/import_ids_job_spec.rb b/spec/esse/async_indexing/jobs/import_ids_job_spec.rb index b0da08e..55f0137 100644 --- a/spec/esse/async_indexing/jobs/import_ids_job_spec.rb +++ b/spec/esse/async_indexing/jobs/import_ids_job_spec.rb @@ -33,6 +33,7 @@ context "when the worker has lazy_document_attributes" do before do stub_esse_index(:geos) do + plugin :async_indexing repository :city do collection { |**, &block| block.call([{id: 1, name: "City 1"}]) } document { |hash, **| {_id: hash[:id], name: hash[:name]} } diff --git a/spec/esse/async_indexing_spec.rb b/spec/esse/async_indexing_spec.rb index 04bddcd..a3bb29c 100755 --- a/spec/esse/async_indexing_spec.rb +++ b/spec/esse/async_indexing_spec.rb @@ -88,4 +88,30 @@ def each_batch_ids expect { described_class.service_name(:invalid) }.to raise_error(ArgumentError, "Invalid service: :invalid, valid services are: sidekiq, faktory") end end + + describe ".plugin_installed?" do + it "returns false when the given index is not a Esse::Index" do + expect(described_class.plugin_installed?(nil)).to be(false) + expect(described_class.plugin_installed?(Object)).to be(false) + expect(described_class.plugin_installed?(Esse::Repository)).to be(false) + expect(described_class.plugin_installed?(Esse::Index)).to be(false) + end + + it "returns false when the given index does not have the :async_indexing plugin" do + stub_esse_index(:geos) do + repository :state, const: true + end + expect(described_class.plugin_installed?(GeosIndex)).to be(false) + expect(described_class.plugin_installed?(GeosIndex::State)).to be(false) + end + + it "returns true when the given index have the :async_indexing plugin" do + stub_esse_index(:geos) do + plugin :async_indexing + repository :state, const: true + end + expect(described_class.plugin_installed?(GeosIndex)).to be(true) + expect(described_class.plugin_installed?(GeosIndex::State)).to be(true) + end + end end diff --git a/spec/esse/plugin/async_indexing/async_indexing_job_spec.rb b/spec/esse/plugin/async_indexing/async_indexing_job_spec.rb index e2ccdde..dfa88de 100644 --- a/spec/esse/plugin/async_indexing/async_indexing_job_spec.rb +++ b/spec/esse/plugin/async_indexing/async_indexing_job_spec.rb @@ -10,7 +10,7 @@ async_indexing_job { |service, repo, op, ids, **kwargs| } end end - expect(GeosIndex::State.async_indexing_jobs.keys).to match_array(%i[import index update delete]) + expect(GeosIndex::State.async_indexing_jobs.keys).to match_array(%i[import index update delete update_lazy_attribute]) expect(GeosIndex::State.async_indexing_jobs).to be_frozen end