Skip to content

Commit

Permalink
feat: update_documents_attribute now automatically retry with bulk in…
Browse files Browse the repository at this point in the history
…dex when doc does not exist. Pass index_on_missing: false to keep previous behaviour
  • Loading branch information
marcosgz committed Sep 11, 2024
1 parent 028943e commit 0146d4f
Show file tree
Hide file tree
Showing 17 changed files with 112 additions and 23 deletions.
4 changes: 2 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
esse (0.4.0.rc1)
esse (0.4.0.rc2)
multi_json
thor (>= 0.19)

Expand Down Expand Up @@ -86,7 +86,7 @@ GEM
lint_roller (~> 1.0)
rubocop-performance (~> 1.16.0)
strscan (3.1.0)
thor (1.3.1)
thor (1.3.2)
unicode-display_width (2.5.0)
webmock (3.23.1)
addressable (>= 2.8.0)
Expand Down
4 changes: 2 additions & 2 deletions gemfiles/Gemfile.elasticsearch-1.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc1)
esse (0.4.0.rc2)
multi_json
thor (>= 0.19)

Expand Down Expand Up @@ -97,7 +97,7 @@ GEM
lint_roller (~> 1.0)
rubocop-performance (~> 1.16.0)
strscan (3.1.0)
thor (1.3.1)
thor (1.3.2)
unicode-display_width (2.5.0)
webmock (3.23.1)
addressable (>= 2.8.0)
Expand Down
4 changes: 2 additions & 2 deletions gemfiles/Gemfile.elasticsearch-2.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc1)
esse (0.4.0.rc2)
multi_json
thor (>= 0.19)

Expand Down Expand Up @@ -101,7 +101,7 @@ GEM
lint_roller (~> 1.0)
rubocop-performance (~> 1.16.0)
strscan (3.1.0)
thor (1.3.1)
thor (1.3.2)
unicode-display_width (2.5.0)
webmock (3.23.1)
addressable (>= 2.8.0)
Expand Down
4 changes: 2 additions & 2 deletions gemfiles/Gemfile.elasticsearch-5.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc1)
esse (0.4.0.rc2)
multi_json
thor (>= 0.19)

Expand Down Expand Up @@ -101,7 +101,7 @@ GEM
lint_roller (~> 1.0)
rubocop-performance (~> 1.16.0)
strscan (3.1.0)
thor (1.3.1)
thor (1.3.2)
unicode-display_width (2.5.0)
webmock (3.23.1)
addressable (>= 2.8.0)
Expand Down
4 changes: 2 additions & 2 deletions gemfiles/Gemfile.elasticsearch-6.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc1)
esse (0.4.0.rc2)
multi_json
thor (>= 0.19)

Expand Down Expand Up @@ -119,7 +119,7 @@ GEM
lint_roller (~> 1.0)
rubocop-performance (~> 1.16.0)
strscan (3.1.0)
thor (1.3.1)
thor (1.3.2)
unicode-display_width (2.5.0)
webmock (3.23.1)
addressable (>= 2.8.0)
Expand Down
4 changes: 2 additions & 2 deletions gemfiles/Gemfile.elasticsearch-7.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc1)
esse (0.4.0.rc2)
multi_json
thor (>= 0.19)

Expand Down Expand Up @@ -102,7 +102,7 @@ GEM
lint_roller (~> 1.0)
rubocop-performance (~> 1.16.0)
strscan (3.1.0)
thor (1.3.1)
thor (1.3.2)
unicode-display_width (2.5.0)
webmock (3.23.1)
addressable (>= 2.8.0)
Expand Down
4 changes: 2 additions & 2 deletions gemfiles/Gemfile.elasticsearch-8.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc1)
esse (0.4.0.rc2)
multi_json
thor (>= 0.19)

Expand Down Expand Up @@ -101,7 +101,7 @@ GEM
lint_roller (~> 1.0)
rubocop-performance (~> 1.16.0)
strscan (3.1.0)
thor (1.3.1)
thor (1.3.2)
unicode-display_width (2.5.0)
webmock (3.23.1)
addressable (>= 2.8.0)
Expand Down
4 changes: 2 additions & 2 deletions gemfiles/Gemfile.opensearch-1.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc1)
esse (0.4.0.rc2)
multi_json
thor (>= 0.19)

Expand Down Expand Up @@ -101,7 +101,7 @@ GEM
lint_roller (~> 1.0)
rubocop-performance (~> 1.16.0)
strscan (3.1.0)
thor (1.3.1)
thor (1.3.2)
unicode-display_width (2.5.0)
webmock (3.23.1)
addressable (>= 2.8.0)
Expand Down
4 changes: 2 additions & 2 deletions gemfiles/Gemfile.opensearch-2.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc1)
esse (0.4.0.rc2)
multi_json
thor (>= 0.19)

Expand Down Expand Up @@ -101,7 +101,7 @@ GEM
lint_roller (~> 1.0)
rubocop-performance (~> 1.16.0)
strscan (3.1.0)
thor (1.3.1)
thor (1.3.2)
unicode-display_width (2.5.0)
webmock (3.23.1)
addressable (>= 2.8.0)
Expand Down
13 changes: 13 additions & 0 deletions lib/esse/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@ def initialize
end
end

class BulkResponseError < ::Esse::Error
attr_reader :response

def initialize(response)
@response = response
super(response)
end

def items
response.fetch('items', []).select { |item| item.values.dig(0, 'error') }
end
end

ES_TRANSPORT_ERRORS = {
'MultipleChoices' => 'MultipleChoicesError', # 300
'MovedPermanently' => 'MovedPermanentlyError', # 301
Expand Down
4 changes: 1 addition & 3 deletions lib/esse/import/bulk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ def each_request(max_retries: 4, last_retry_in_small_chunks: true)
requests.each do |request|
next unless request.body?
resp = yield request
if resp&.[]('errors')
raise resp&.fetch('items', [])&.select { |item| item.values.first['error'] }&.join("\n")
end
raise Esse::Transport::BulkResponseError.new(resp) if resp&.[]('errors')
end
rescue Faraday::TimeoutError, Esse::Transport::RequestTimeoutError => e
retry_count += 1
Expand Down
25 changes: 24 additions & 1 deletion lib/esse/repository/documents.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,31 @@ def import(**kwargs)
def update_documents_attribute(name, ids_or_doc_headers = [], kwargs = {})
batch = documents_for_lazy_attribute(name, ids_or_doc_headers)
return if batch.empty?
kwargs = kwargs.transform_keys(&:to_sym)

index.bulk(**kwargs.transform_keys(&:to_sym), update: batch)
if kwargs.delete(:index_on_missing) { true }
begin
index.bulk(**kwargs, update: batch)
rescue Esse::Transport::BulkResponseError => ex
ids = ex.items.map { |item| item.dig('update', '_id') }.compact
raise ex if ids.empty?

each_serialized_batch(eager_load_lazy_attributes: false, preload_lazy_attributes: false, id: ids) do |entries|
entries.each do |entry|
partial_doc = batch.find { |doc| doc.eql?(entry, match_lazy_doc_header: true) }
next unless partial_doc

partial_doc.source.each do |attr_name, attr_value|
entry.mutate(attr_name) { attr_value }
end
end

index.bulk(**kwargs, index: entries)
end
end
else
index.bulk(**kwargs, update: batch)
end
end

def documents_for_lazy_attribute(name, ids_or_doc_headers)
Expand Down
2 changes: 1 addition & 1 deletion lib/esse/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Esse
VERSION = '0.4.0.rc1'
VERSION = '0.4.0.rc2'
end
6 changes: 6 additions & 0 deletions spec/support/shared_contexts/geos_index_definition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def source
collection do |**context, &block|
dataset.fetch(:state).each do |batch|
states = context[:conditions] ? batch.select(&context[:conditions]) : batch
if (ids = Esse::ArrayUtils.wrap(context[:id]).map(&:to_i)).any?
states = states.select { |s| ids.include?(s[:id]) }
end
block.call(states, **context) unless states.empty?
end
end
Expand All @@ -79,6 +82,9 @@ def source
collection do |**context, &block|
dataset.fetch(:county).each do |batch|
counties = context[:conditions] ? batch.select(&context[:conditions]) : batch
if (ids = Esse::ArrayUtils.wrap(context[:id]).map(&:to_i)).any?
counties = counties.select { |s| ids.include?(s[:id]) }
end
block.call(counties, **context) unless counties.empty?
end
end
Expand Down
3 changes: 3 additions & 0 deletions spec/support/shared_contexts/stories_index_definition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
repository :story do
collection do |**context, &block|
stories = context[:conditions] ? ds.select(&context[:conditions]) : ds
if (ids = Esse::ArrayUtils.wrap(context[:id]).map(&:to_i)).any?
stories = stories.select { |s| ids.include?(s[:id]) }
end
block.call(stories, **context) unless stories.empty?
end
document do |story, **context|
Expand Down
3 changes: 3 additions & 0 deletions spec/support/shared_contexts/venues_index_definition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
stub_index(:venues) do
repository :default do
collection do |**context, &block|
if (ids = Esse::ArrayUtils.wrap(context[:id]).map(&:to_i)).any?
ds = ds.select { |s| ids.include?(s[:id]) }
end
block.call(ds, **context) unless ds.empty?
end
document do |venue, **context|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,49 @@
end
end

context 'when document does not exist' do
include_context 'with stories index definition'

it 'raises the Esse::Transport::BulkResponseError error when passing index_on_missing: false' do
es_client do |client, _conf, cluster|
StoriesIndex.create_index(alias: true)

expect {
StoriesIndex::Story.update_documents_attribute(:tags, [
{ _id: '1001', routing: 'nyt' },
], index_on_missing: false)
}.to raise_error(Esse::Transport::BulkResponseError)
end
end

it 'retries importing the document when passing index_on_missing: true' do
es_client do |client, _conf, cluster|
StoriesIndex.create_index(alias: true)

resp = nil
expect {
resp = StoriesIndex::Story.import(context: { conditions: ->(s) { s[:id] == 1_001 } })
}.not_to raise_error

StoriesIndex.refresh
expect(StoriesIndex.count).to eq(1)

expect {
resp = StoriesIndex::Story.update_documents_attribute(:tags, [
{ _id: '1002', routing: 'nyt' },
{ _id: '1003', routing: 'nyt' }
], refresh: true)
}.not_to raise_error(Esse::Transport::BulkResponseError)

StoriesIndex.refresh
expect(StoriesIndex.count).to eq(nyt_stories.size)

doc = StoriesIndex.get(id: '1002', routing: 'nyt')
expect(doc.dig('_source', 'publication')).to eq('nyt')
end
end
end

context 'when the index does not have routing' do
include_context 'with geos index definition'

Expand Down

0 comments on commit 0146d4f

Please sign in to comment.