Skip to content

Commit

Permalink
feat: Add async_update_lazy_attributes command to push lazy attr rela…
Browse files Browse the repository at this point in the history
…ted jobs (#4)
  • Loading branch information
marcosgz authored Aug 22, 2024
1 parent dfa61d6 commit ff7cad3
Show file tree
Hide file tree
Showing 11 changed files with 380 additions and 24 deletions.
7 changes: 7 additions & 0 deletions lib/esse/async_indexing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ def self.async_indexing_repo?(repo)

repo.respond_to?(:implement_batch_ids?) && repo.implement_batch_ids?
end

def self.plugin_installed?(index)
index = index.index if index.is_a?(Class) && index < Esse::Repository
return false unless index.is_a?(Class) && index < Esse::Index

index.plugins.include?(Esse::Plugins::AsyncIndexing)
end
end

Esse::Config.__send__ :include, Esse::AsyncIndexing::Config
Expand Down
14 changes: 14 additions & 0 deletions lib/esse/async_indexing/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,18 @@ def async_import(*index_classes)
require "esse/async_indexing/cli/async_import"
Esse::AsyncIndexing::CLI::AsyncImport.new(indices: index_classes, **opts).run
end

desc "async_update_lazy_attributes INDEX_CLASS", "Async update lazy attributes for the given index"
option :repo, type: :string, default: nil, alias: "-r", desc: "Repository to use for import"
option :suffix, type: :string, default: nil, aliases: "-s", desc: "Suffix to append to index name"
option :context, type: :hash, default: {}, required: true, desc: "List of options to pass to the index class"
option :service, type: :string, default: nil, alias: "-s", desc: "Service to use for async import: sidekiq, faktory"
option :job_options, type: :hash, default: {}, desc: "List of options to pass to the background job. (Example: --job-options=queue:default)"
def async_update_lazy_attributes(index_class, *attributes)
opts = Esse::HashUtils.deep_transform_keys(options.to_h, &:to_sym)
opts[:service] ||= Esse.config.async_indexing.services.first
require "esse/async_indexing/cli/async_update_lazy_attributes"

Esse::AsyncIndexing::CLI::AsyncUpdateLazyAttributes.new(indices: [index_class], attributes: attributes, **opts).run
end
end
26 changes: 16 additions & 10 deletions lib/esse/async_indexing/cli/async_import.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,23 @@
class Esse::AsyncIndexing::CLI::AsyncImport < Esse::CLI::Index::BaseOperation
WORKER_NAME = "Esse::AsyncIndexing::Jobs::ImportIdsJob"

attr_reader :job_options, :service_name

def initialize(indices:, job_options: {}, service: nil, **options)
@job_options = job_options
@service_name = (service || Esse.config.async_indexing.services.first)&.to_sym
super(indices: indices, **options)
end

def run
validate_options!
indices.each do |index|
unless Esse::AsyncIndexing.plugin_installed?(index)
raise Esse::CLI::InvalidOption, <<~MSG
The #{index} index does not support async indexing. Make sure you have the `plugin :async_indexing` in your `#{index}` class.
MSG
end

repos = if (repo = @options[:repo])
[index.repo(repo)]
else
Expand All @@ -17,7 +31,7 @@ def run
repos.each do |repo|
unless Esse::AsyncIndexing.async_indexing_repo?(repo)
raise Esse::CLI::InvalidOption, <<~MSG
The #{repo} repository does not support async indexing. Make sure you have the `plugin :async_indexing` in your `#{index}` class and the :#{repo.repo_name} collection implements the `#each_batch_ids` method.
The #{repo} repository does not support async indexing. Make sure the :#{repo.repo_name} collection of `#{index}` implements the `#each_batch_ids` method.
MSG
end

Expand All @@ -42,21 +56,13 @@ def run

def bulk_options
@bulk_options ||= begin
hash = @options.slice(*@options.keys - Esse::CLI_IGNORE_OPTS - [:repo, :service, :job_options])
hash = @options.slice(*@options.keys - Esse::CLI_IGNORE_OPTS - [:repo])
hash.delete(:context) if hash[:context].nil? || hash[:context].empty?
hash
end
end

def job_options
@job_options ||= @options[:job_options] || {}
end

def validate_options!
validate_indices_option!
end

def service_name
(@options[:service] || Esse.config.async_indexing.services.first)&.to_sym
end
end
82 changes: 82 additions & 0 deletions lib/esse/async_indexing/cli/async_update_lazy_attributes.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# frozen_string_literal: true

require "esse/cli/index/base_operation"

class Esse::AsyncIndexing::CLI::AsyncUpdateLazyAttributes < Esse::CLI::Index::BaseOperation
WORKER_NAME = "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob"

attr_reader :attributes, :job_options, :service_name

def initialize(indices:, attributes: nil, job_options: nil, service: nil, **options)
super(indices: indices, **options)
@attributes = Array(attributes)
@job_options = job_options || {}
@service_name = (service || Esse.config.async_indexing.services.first)&.to_sym
end

def run
validate_options!
indices.each do |index|
unless Esse::AsyncIndexing.plugin_installed?(index)
raise Esse::CLI::InvalidOption, <<~MSG
The #{index} index does not support async indexing. Make sure you have the `plugin :async_indexing` in your `#{index}` class.
MSG
end

repos = if (repo = @options[:repo])
[index.repo(repo)]
else
index.repo_hash.values
end

repos.each do |repo|
unless Esse::AsyncIndexing.async_indexing_repo?(repo)
raise Esse::CLI::InvalidOption, <<~MSG
The #{repo} repository does not support async indexing. Make sure the :#{repo.repo_name} collection of `#{index}` implements the `#each_batch_ids` method.
MSG
end

attrs = repo_attributes(repo)
next unless attrs.any?

enqueuer = if (caller = repo.async_indexing_jobs[:update_lazy_attribute])
->(ids) do
attrs.each do |attribute|
caller.call(service: service_name, repo: repo, operation: :update_lazy_attribute, attribute: attribute, ids: ids, **bulk_options)
end
end
else
->(ids) do
attrs.each do |attribute|
BackgroundJob.job(service_name, WORKER_NAME, **job_options)
.with_args(repo.index.name, repo.repo_name, attribute.to_s, ids, Esse::HashUtils.deep_transform_keys(bulk_options, &:to_s))
.push
end
end
end

repo.batch_ids(**bulk_options.fetch(:context, {})).each(&enqueuer)
end
end
end

private

def bulk_options
@bulk_options ||= begin
hash = @options.slice(*@options.keys - Esse::CLI_IGNORE_OPTS - [:repo])
hash.delete(:context) if hash[:context].nil? || hash[:context].empty?
hash
end
end

def validate_options!
validate_indices_option!
end

def repo_attributes(repo)
return repo.lazy_document_attributes.keys if attributes.empty?

repo.lazy_document_attribute_names(attributes)
end
end
16 changes: 10 additions & 6 deletions lib/esse/async_indexing/jobs/import_ids_job.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# frozen_string_literal: true

class Esse::AsyncIndexing::Jobs::ImportIdsJob
LAZY_ATTR_WORKER = "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob"

def perform(index_class_name, repo_name, ids, options = {})
# This is specific to the AsyncIndexing plugin, can't pass to Esse import method
enqueue_lazy = options.delete(:enqueue_lazy_attributes) if options.key?(:enqueue_lazy_attributes)
Expand All @@ -16,12 +14,18 @@ def perform(index_class_name, repo_name, ids, options = {})
return total if lazy_already_imported?(options)
return total unless self.class.respond_to?(:background_job_service)

_index_class, repo_class = Esse::AsyncIndexing::Actions::CoerceIndexRepository.call(index_class_name, repo_name)
index_class, repo_class = Esse::AsyncIndexing::Actions::CoerceIndexRepository.call(index_class_name, repo_name)
return total unless Esse::AsyncIndexing.plugin_installed?(index_class)

repo_class.lazy_document_attributes.each_key do |attr_name|
BackgroundJob.job(self.class.background_job_service, LAZY_ATTR_WORKER)
.with_args(index_class_name, repo_name, attr_name.to_s, ids, options)
.push
repo_class.async_indexing_job_for(:update_lazy_attribute).call(
**options,
service: self.class.background_job_service,
repo: repo_class,
operation: :update_lazy_attribute,
attribute: attr_name,
ids: ids
)
end
total
end
Expand Down
9 changes: 8 additions & 1 deletion lib/esse/plugins/async_indexing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ module RepositoryClassMethods
.with_args(repo.index.name, repo.repo_name, id, Esse::HashUtils.deep_transform_keys(kwargs, &:to_s))
.push
end
},
update_lazy_attribute: ->(service:, repo:, operation:, attribute:, ids:, **kwargs) {
unless (ids = Esse::ArrayUtils.wrap(ids)).empty?
BackgroundJob.job(service, "Esse::AsyncIndexing::Jobs::BulkUpdateLazyAttributeJob")
.with_args(repo.index.name, repo.repo_name, attribute.to_s, ids, Esse::HashUtils.deep_transform_keys(kwargs, &:to_s))
.push
end
}
}.freeze

Expand Down Expand Up @@ -93,7 +100,7 @@ def async_indexing_job_for(operation)
end

class AsyncIndexingJobValidator
OPERATIONS = %i[import index update delete].freeze
OPERATIONS = %i[import index update delete update_lazy_attribute].freeze

def self.call(operations, block)
unless block.is_a?(Proc)
Expand Down
12 changes: 6 additions & 6 deletions spec/esse/async_indexing/cli/async_import_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def each_batch_ids
it "raises an error if the repository does not have the async_indexing plugin" do
expect {
cli_exec(%w[index async_import CountiesIndex])
}.to raise_error(Esse::CLI::InvalidOption, /The CountiesIndex::County repository does not support async indexing/)
}.to raise_error(Esse::CLI::InvalidOption, /The CountiesIndex index does not support async indexing. Make sure you have/)
end
end

Expand Down Expand Up @@ -103,7 +103,7 @@ def each_batch_ids
it "allows --update-lazy-attributes as a single value" do
Esse.config.async_indexing.faktory
cli_exec(%w[index async_import CitiesIndex --update-lazy-attributes=foo])
expect{ "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("CitiesIndex", "city", [1, 2, 3], "update_lazy_attributes" => ["foo"]).on(:faktory)
expect { "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("CitiesIndex", "city", [1, 2, 3], "update_lazy_attributes" => ["foo"]).on(:faktory)
end

it "allows --update-lazy-attributes as multiple comma separated values" do
Expand Down Expand Up @@ -195,14 +195,14 @@ def each_batch_ids

it "enqueues the faktory job for the given index when passing --service=faktory" do
cli_exec(%w[index async_import GeosIndex --service=faktory])
expect{ "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("GeosIndex", "country", [1, 2, 3], {}).on(:faktory)
expect{ "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("GeosIndex", "city", [1, 2, 3], {}).on(:faktory)
expect { "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("GeosIndex", "country", [1, 2, 3], {}).on(:faktory)
expect { "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("GeosIndex", "city", [1, 2, 3], {}).on(:faktory)
end

it "enqueues the faktory job for the given index when passing --service=sidekiq" do
cli_exec(%w[index async_import GeosIndex --service=sidekiq])
expect{ "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("GeosIndex", "country", [1, 2, 3], {}).on(:sidekiq)
expect{ "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("GeosIndex", "city", [1, 2, 3], {}).on(:sidekiq)
expect { "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("GeosIndex", "country", [1, 2, 3], {}).on(:sidekiq)
expect { "Esse::AsyncIndexing::Jobs::ImportIdsJob" }.to have_enqueued_background_job("GeosIndex", "city", [1, 2, 3], {}).on(:sidekiq)
end
end

Expand Down
Loading

0 comments on commit ff7cad3

Please sign in to comment.