Skip to content

Commit

Permalink
feat: add functionality for the last attempt to import each document …
Browse files Browse the repository at this point in the history
…when all attempts got timeout error
  • Loading branch information
marcosgz committed Jul 5, 2024
1 parent ebbe801 commit 1df7622
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 5 deletions.
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ Style/StringLiterals:

Style/StringLiteralsInInterpolation:
EnforcedStyle: single_quotes

RSpec/ExampleLength:
Max: 20
18 changes: 16 additions & 2 deletions lib/esse/import/bulk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ def initialize(type: nil, index: nil, delete: nil, create: nil)
# Return an array of RequestBody instances
#
# In case of timeout error, will retry with an exponential backoff using the following formula:
# wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1)) seconds. It will retry up to max_retries times that is default 3.
# wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1)) seconds. It will retry up to max_retries times that is default 4.
#
# Too large bulk requests will be split into multiple requests with only one attempt.
#
# @yield [RequestBody] A request body instance
def each_request(max_retries: 3)
def each_request(max_retries: 4, last_retry_in_small_chunks: true)
# @TODO create indexes when by checking all the index suffixes (if mapping is not empty)
requests = [optimistic_request]
retry_count = 0
Expand All @@ -43,6 +43,8 @@ def each_request(max_retries: 3)
rescue Faraday::TimeoutError, Esse::Transport::RequestTimeoutError => e
retry_count += 1
raise Esse::Transport::RequestTimeoutError.new(e.message) if retry_count >= max_retries
# Timeout error may be caused by a too large request, so we split the requests in small chunks as a last attempt
requests = requests_in_small_chunks if last_retry_in_small_chunks && max_retries > 2 && retry_count == max_retries - 2
wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1))
Esse.logger.warn "Timeout error, retrying in #{wait_interval} seconds"
sleep(wait_interval)
Expand Down Expand Up @@ -73,6 +75,18 @@ def optimistic_request
request
end

def requests_in_small_chunks(chunk_size: 1)
arr = []
@delete.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.delete = slice } }
@create.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.create = slice } }
@index.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.index = slice } }
Esse.logger.warn <<~MSG
Retrying the last request in small chunks of #{chunk_size} documents.
This is a last resort to avoid timeout errors, consider increasing the bulk size or reducing the batch size.
MSG
arr
end

# @return [Array<RequestBody>]
def balance_requests_size(err)
if (bulk_size = err.message.scan(/exceeded.(\d+).bytes/).dig(0, 0).to_i) > 0
Expand Down
29 changes: 26 additions & 3 deletions spec/esse/import/bulk_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
let(:index) { [Esse::HashDocument.new(id: 1, source: { foo: 'bar' })] }
let(:create) { [Esse::HashDocument.new(id: 2, source: { foo: 'bar' })] }
let(:delete) { [Esse::HashDocument.new(id: 3, source: { foo: 'bar' })] }
let(:bulk) { Esse::Import::Bulk.new(index: index, create: create, delete: delete) }
let(:bulk) { described_class.new(index: index, create: create, delete: delete) }

it 'yields a request body instance' do
expect { |b| bulk.each_request(&b) }.to yield_with_args(Esse::Import::RequestBodyAsJson)
Expand All @@ -18,7 +18,7 @@
expect(Esse.logger).to receive(:warn).with(an_instance_of(String)).twice
retries = 0
expect {
bulk.each_request(max_retries: 3) { |request|
bulk.each_request(max_retries: 3, last_retry_in_small_chunks: false) { |request|
retries += 1
raise Faraday::TimeoutError
}
Expand All @@ -31,7 +31,7 @@
expect(Esse.logger).to receive(:warn).with(an_instance_of(String)).twice
retries = 0
expect {
bulk.each_request(max_retries: 3) { |request|
bulk.each_request(max_retries: 3, last_retry_in_small_chunks: false) { |request|
retries += 1
raise Esse::Transport::RequestTimeoutError
}
Expand All @@ -49,6 +49,29 @@
end
end

context 'when on last retry and last_retry_in_small_chunks is true' do
let(:index) do
%w[foo bar baz].each_with_index.map { |name, idx| Esse::HashDocument.new(id: idx + 10, source: { name: name }) }
end
let(:create) do
%w[foo bar baz].each_with_index.map { |name, idx| Esse::HashDocument.new(id: idx + 20, source: { name: name }) }
end
let(:delete) do
%w[foo bar baz].each_with_index.map { |name, idx| Esse::HashDocument.new(id: idx + 30, source: { name: name }) }
end
let(:bulk) { described_class.new(index: index, create: create, delete: delete) }

it 'retries in small chunks' do
expect(bulk).to receive(:sleep).with(an_instance_of(Integer)).exactly(3).times
requests = []
bulk.each_request(last_retry_in_small_chunks: true) { |request|
requests << request
raise Faraday::TimeoutError if [1, 2, 3].include?(requests.size)
}
expect(requests.size).to eq(3 + index.size + create.size + delete.size)
end
end

context 'with a request entity too large error' do
let(:index) { [Esse::HashDocument.new(id: 1, source: { name: 'Aaa' * 30 })] }
let(:create) { [Esse::HashDocument.new(id: 2, source: { name: 'Bbbb' * 100 })] }
Expand Down

0 comments on commit 1df7622

Please sign in to comment.