diff --git a/Gemfile.lock b/Gemfile.lock index 3637805..10db436 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - esse (0.4.0.rc1) + esse (0.4.0.rc2) multi_json thor (>= 0.19) @@ -86,7 +86,7 @@ GEM lint_roller (~> 1.0) rubocop-performance (~> 1.16.0) strscan (3.1.0) - thor (1.3.1) + thor (1.3.2) unicode-display_width (2.5.0) webmock (3.23.1) addressable (>= 2.8.0) diff --git a/gemfiles/Gemfile.elasticsearch-1.x.lock b/gemfiles/Gemfile.elasticsearch-1.x.lock index 24f54b7..08a30f9 100644 --- a/gemfiles/Gemfile.elasticsearch-1.x.lock +++ b/gemfiles/Gemfile.elasticsearch-1.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc1) + esse (0.4.0.rc2) multi_json thor (>= 0.19) @@ -97,7 +97,7 @@ GEM lint_roller (~> 1.0) rubocop-performance (~> 1.16.0) strscan (3.1.0) - thor (1.3.1) + thor (1.3.2) unicode-display_width (2.5.0) webmock (3.23.1) addressable (>= 2.8.0) diff --git a/gemfiles/Gemfile.elasticsearch-2.x.lock b/gemfiles/Gemfile.elasticsearch-2.x.lock index 6170440..6bb2f70 100644 --- a/gemfiles/Gemfile.elasticsearch-2.x.lock +++ b/gemfiles/Gemfile.elasticsearch-2.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc1) + esse (0.4.0.rc2) multi_json thor (>= 0.19) @@ -101,7 +101,7 @@ GEM lint_roller (~> 1.0) rubocop-performance (~> 1.16.0) strscan (3.1.0) - thor (1.3.1) + thor (1.3.2) unicode-display_width (2.5.0) webmock (3.23.1) addressable (>= 2.8.0) diff --git a/gemfiles/Gemfile.elasticsearch-5.x.lock b/gemfiles/Gemfile.elasticsearch-5.x.lock index 418a138..f4b4952 100644 --- a/gemfiles/Gemfile.elasticsearch-5.x.lock +++ b/gemfiles/Gemfile.elasticsearch-5.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc1) + esse (0.4.0.rc2) multi_json thor (>= 0.19) @@ -101,7 +101,7 @@ GEM lint_roller (~> 1.0) rubocop-performance (~> 1.16.0) strscan (3.1.0) - thor (1.3.1) + thor (1.3.2) unicode-display_width (2.5.0) webmock (3.23.1) addressable (>= 2.8.0) diff --git a/gemfiles/Gemfile.elasticsearch-6.x.lock b/gemfiles/Gemfile.elasticsearch-6.x.lock index 242087c..6b79618 100644 --- a/gemfiles/Gemfile.elasticsearch-6.x.lock +++ b/gemfiles/Gemfile.elasticsearch-6.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc1) + esse (0.4.0.rc2) multi_json thor (>= 0.19) @@ -119,7 +119,7 @@ GEM lint_roller (~> 1.0) rubocop-performance (~> 1.16.0) strscan (3.1.0) - thor (1.3.1) + thor (1.3.2) unicode-display_width (2.5.0) webmock (3.23.1) addressable (>= 2.8.0) diff --git a/gemfiles/Gemfile.elasticsearch-7.x.lock b/gemfiles/Gemfile.elasticsearch-7.x.lock index 131a12f..b055529 100644 --- a/gemfiles/Gemfile.elasticsearch-7.x.lock +++ b/gemfiles/Gemfile.elasticsearch-7.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc1) + esse (0.4.0.rc2) multi_json thor (>= 0.19) @@ -102,7 +102,7 @@ GEM lint_roller (~> 1.0) rubocop-performance (~> 1.16.0) strscan (3.1.0) - thor (1.3.1) + thor (1.3.2) unicode-display_width (2.5.0) webmock (3.23.1) addressable (>= 2.8.0) diff --git a/gemfiles/Gemfile.elasticsearch-8.x.lock b/gemfiles/Gemfile.elasticsearch-8.x.lock index bffe836..e47e461 100644 --- a/gemfiles/Gemfile.elasticsearch-8.x.lock +++ b/gemfiles/Gemfile.elasticsearch-8.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc1) + esse (0.4.0.rc2) multi_json thor (>= 0.19) @@ -101,7 +101,7 @@ GEM lint_roller (~> 1.0) rubocop-performance (~> 1.16.0) strscan (3.1.0) - thor (1.3.1) + thor (1.3.2) unicode-display_width (2.5.0) webmock (3.23.1) addressable (>= 2.8.0) diff --git a/gemfiles/Gemfile.opensearch-1.x.lock b/gemfiles/Gemfile.opensearch-1.x.lock index 99463dd..780d07f 100644 --- a/gemfiles/Gemfile.opensearch-1.x.lock +++ b/gemfiles/Gemfile.opensearch-1.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc1) + esse (0.4.0.rc2) multi_json thor (>= 0.19) @@ -101,7 +101,7 @@ GEM lint_roller (~> 1.0) rubocop-performance (~> 1.16.0) strscan (3.1.0) - thor (1.3.1) + thor (1.3.2) unicode-display_width (2.5.0) webmock (3.23.1) addressable (>= 2.8.0) diff --git a/gemfiles/Gemfile.opensearch-2.x.lock b/gemfiles/Gemfile.opensearch-2.x.lock index 438ab0f..b36c8f7 100644 --- a/gemfiles/Gemfile.opensearch-2.x.lock +++ b/gemfiles/Gemfile.opensearch-2.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc1) + esse (0.4.0.rc2) multi_json thor (>= 0.19) @@ -101,7 +101,7 @@ GEM lint_roller (~> 1.0) rubocop-performance (~> 1.16.0) strscan (3.1.0) - thor (1.3.1) + thor (1.3.2) unicode-display_width (2.5.0) webmock (3.23.1) addressable (>= 2.8.0) diff --git a/lib/esse/errors.rb b/lib/esse/errors.rb index 2780e8d..0c788b8 100644 --- a/lib/esse/errors.rb +++ b/lib/esse/errors.rb @@ -15,6 +15,19 @@ def initialize end end + class BulkResponseError < ::Esse::Error + attr_reader :response + + def initialize(response) + @response = response + super(response) + end + + def items + response.fetch('items', []).select { |item| item.values.dig(0, 'error') } + end + end + ES_TRANSPORT_ERRORS = { 'MultipleChoices' => 'MultipleChoicesError', # 300 'MovedPermanently' => 'MovedPermanentlyError', # 301 diff --git a/lib/esse/import/bulk.rb b/lib/esse/import/bulk.rb index 60c9f69..9b15813 100644 --- a/lib/esse/import/bulk.rb +++ b/lib/esse/import/bulk.rb @@ -49,9 +49,7 @@ def each_request(max_retries: 4, last_retry_in_small_chunks: true) requests.each do |request| next unless request.body? resp = yield request - if resp&.[]('errors') - raise resp&.fetch('items', [])&.select { |item| item.values.first['error'] }&.join("\n") - end + raise Esse::Transport::BulkResponseError.new(resp) if resp&.[]('errors') end rescue Faraday::TimeoutError, Esse::Transport::RequestTimeoutError => e retry_count += 1 diff --git a/lib/esse/repository/documents.rb b/lib/esse/repository/documents.rb index 69e4f3b..1402876 100644 --- a/lib/esse/repository/documents.rb +++ b/lib/esse/repository/documents.rb @@ -10,8 +10,31 @@ def import(**kwargs) def update_documents_attribute(name, ids_or_doc_headers = [], kwargs = {}) batch = documents_for_lazy_attribute(name, ids_or_doc_headers) return if batch.empty? + kwargs = kwargs.transform_keys(&:to_sym) - index.bulk(**kwargs.transform_keys(&:to_sym), update: batch) + if kwargs.delete(:index_on_missing) { true } + begin + index.bulk(**kwargs, update: batch) + rescue Esse::Transport::BulkResponseError => ex + ids = ex.items.map { |item| item.dig('update', '_id') }.compact + raise ex if ids.empty? + + each_serialized_batch(eager_load_lazy_attributes: false, preload_lazy_attributes: false, id: ids) do |entries| + entries.each do |entry| + partial_doc = batch.find { |doc| doc.eql?(entry, match_lazy_doc_header: true) } + next unless partial_doc + + partial_doc.source.each do |attr_name, attr_value| + entry.mutate(attr_name) { attr_value } + end + end + + index.bulk(**kwargs, index: entries) + end + end + else + index.bulk(**kwargs, update: batch) + end end def documents_for_lazy_attribute(name, ids_or_doc_headers) diff --git a/lib/esse/version.rb b/lib/esse/version.rb index b4b4bf7..d13131b 100644 --- a/lib/esse/version.rb +++ b/lib/esse/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Esse - VERSION = '0.4.0.rc1' + VERSION = '0.4.0.rc2' end diff --git a/spec/support/shared_contexts/geos_index_definition.rb b/spec/support/shared_contexts/geos_index_definition.rb index 5a22722..98f684c 100644 --- a/spec/support/shared_contexts/geos_index_definition.rb +++ b/spec/support/shared_contexts/geos_index_definition.rb @@ -70,6 +70,9 @@ def source collection do |**context, &block| dataset.fetch(:state).each do |batch| states = context[:conditions] ? batch.select(&context[:conditions]) : batch + if (ids = Esse::ArrayUtils.wrap(context[:id]).map(&:to_i)).any? + states = states.select { |s| ids.include?(s[:id]) } + end block.call(states, **context) unless states.empty? end end @@ -79,6 +82,9 @@ def source collection do |**context, &block| dataset.fetch(:county).each do |batch| counties = context[:conditions] ? batch.select(&context[:conditions]) : batch + if (ids = Esse::ArrayUtils.wrap(context[:id]).map(&:to_i)).any? + counties = counties.select { |s| ids.include?(s[:id]) } + end block.call(counties, **context) unless counties.empty? end end diff --git a/spec/support/shared_contexts/stories_index_definition.rb b/spec/support/shared_contexts/stories_index_definition.rb index f8dd186..4e4d695 100644 --- a/spec/support/shared_contexts/stories_index_definition.rb +++ b/spec/support/shared_contexts/stories_index_definition.rb @@ -38,6 +38,9 @@ repository :story do collection do |**context, &block| stories = context[:conditions] ? ds.select(&context[:conditions]) : ds + if (ids = Esse::ArrayUtils.wrap(context[:id]).map(&:to_i)).any? + stories = stories.select { |s| ids.include?(s[:id]) } + end block.call(stories, **context) unless stories.empty? end document do |story, **context| diff --git a/spec/support/shared_contexts/venues_index_definition.rb b/spec/support/shared_contexts/venues_index_definition.rb index eca38aa..01a6dfc 100644 --- a/spec/support/shared_contexts/venues_index_definition.rb +++ b/spec/support/shared_contexts/venues_index_definition.rb @@ -22,6 +22,9 @@ stub_index(:venues) do repository :default do collection do |**context, &block| + if (ids = Esse::ArrayUtils.wrap(context[:id]).map(&:to_i)).any? + ds = ds.select { |s| ids.include?(s[:id]) } + end block.call(ds, **context) unless ds.empty? end document do |venue, **context| diff --git a/spec/support/shared_examples/repository_documents_update_documents_attribute.rb b/spec/support/shared_examples/repository_documents_update_documents_attribute.rb index d076dc5..fe83ae9 100644 --- a/spec/support/shared_examples/repository_documents_update_documents_attribute.rb +++ b/spec/support/shared_examples/repository_documents_update_documents_attribute.rb @@ -33,6 +33,49 @@ end end + context 'when document does not exist' do + include_context 'with stories index definition' + + it 'raises the Esse::Transport::BulkResponseError error when passing index_on_missing: false' do + es_client do |client, _conf, cluster| + StoriesIndex.create_index(alias: true) + + expect { + StoriesIndex::Story.update_documents_attribute(:tags, [ + { _id: '1001', routing: 'nyt' }, + ], index_on_missing: false) + }.to raise_error(Esse::Transport::BulkResponseError) + end + end + + it 'retries importing the document when passing index_on_missing: true' do + es_client do |client, _conf, cluster| + StoriesIndex.create_index(alias: true) + + resp = nil + expect { + resp = StoriesIndex::Story.import(context: { conditions: ->(s) { s[:id] == 1_001 } }) + }.not_to raise_error + + StoriesIndex.refresh + expect(StoriesIndex.count).to eq(1) + + expect { + resp = StoriesIndex::Story.update_documents_attribute(:tags, [ + { _id: '1002', routing: 'nyt' }, + { _id: '1003', routing: 'nyt' } + ], refresh: true) + }.not_to raise_error(Esse::Transport::BulkResponseError) + + StoriesIndex.refresh + expect(StoriesIndex.count).to eq(nyt_stories.size) + + doc = StoriesIndex.get(id: '1002', routing: 'nyt') + expect(doc.dig('_source', 'publication')).to eq('nyt') + end + end + end + context 'when the index does not have routing' do include_context 'with geos index definition'