diff --git a/.rubocop.yml b/.rubocop.yml index d8c9e2b..0c4ee6b 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -36,3 +36,6 @@ Style/StringLiterals: Style/StringLiteralsInInterpolation: EnforcedStyle: single_quotes + +RSpec/ExampleLength: + Max: 20 diff --git a/lib/esse/import/bulk.rb b/lib/esse/import/bulk.rb index 00a6b7b..67a020a 100644 --- a/lib/esse/import/bulk.rb +++ b/lib/esse/import/bulk.rb @@ -22,12 +22,12 @@ def initialize(type: nil, index: nil, delete: nil, create: nil) # Return an array of RequestBody instances # # In case of timeout error, will retry with an exponential backoff using the following formula: - # wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1)) seconds. It will retry up to max_retries times that is default 3. + # wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1)) seconds. It will retry up to max_retries times that is default 4. # # Too large bulk requests will be split into multiple requests with only one attempt. # # @yield [RequestBody] A request body instance - def each_request(max_retries: 3) + def each_request(max_retries: 4, last_retry_in_small_chunks: true) # @TODO create indexes when by checking all the index suffixes (if mapping is not empty) requests = [optimistic_request] retry_count = 0 @@ -43,6 +43,8 @@ def each_request(max_retries: 3) rescue Faraday::TimeoutError, Esse::Transport::RequestTimeoutError => e retry_count += 1 raise Esse::Transport::RequestTimeoutError.new(e.message) if retry_count >= max_retries + # Timeout error may be caused by a too large request, so we split the requests in small chunks as a last attempt + requests = requests_in_small_chunks if last_retry_in_small_chunks && max_retries > 2 && retry_count == max_retries - 2 wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1)) Esse.logger.warn "Timeout error, retrying in #{wait_interval} seconds" sleep(wait_interval) @@ -73,6 +75,18 @@ def optimistic_request request end + def requests_in_small_chunks(chunk_size: 1) + arr = [] + @delete.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.delete = slice } } + @create.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.create = slice } } + @index.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.index = slice } } + Esse.logger.warn <<~MSG + Retrying the last request in small chunks of #{chunk_size} documents. + This is a last resort to avoid timeout errors, consider increasing the bulk size or reducing the batch size. + MSG + arr + end + # @return [Array] def balance_requests_size(err) if (bulk_size = err.message.scan(/exceeded.(\d+).bytes/).dig(0, 0).to_i) > 0 diff --git a/spec/esse/import/bulk_spec.rb b/spec/esse/import/bulk_spec.rb index 6dbefbf..2a1d8be 100644 --- a/spec/esse/import/bulk_spec.rb +++ b/spec/esse/import/bulk_spec.rb @@ -7,7 +7,7 @@ let(:index) { [Esse::HashDocument.new(id: 1, source: { foo: 'bar' })] } let(:create) { [Esse::HashDocument.new(id: 2, source: { foo: 'bar' })] } let(:delete) { [Esse::HashDocument.new(id: 3, source: { foo: 'bar' })] } - let(:bulk) { Esse::Import::Bulk.new(index: index, create: create, delete: delete) } + let(:bulk) { described_class.new(index: index, create: create, delete: delete) } it 'yields a request body instance' do expect { |b| bulk.each_request(&b) }.to yield_with_args(Esse::Import::RequestBodyAsJson) @@ -18,7 +18,7 @@ expect(Esse.logger).to receive(:warn).with(an_instance_of(String)).twice retries = 0 expect { - bulk.each_request(max_retries: 3) { |request| + bulk.each_request(max_retries: 3, last_retry_in_small_chunks: false) { |request| retries += 1 raise Faraday::TimeoutError } @@ -31,7 +31,7 @@ expect(Esse.logger).to receive(:warn).with(an_instance_of(String)).twice retries = 0 expect { - bulk.each_request(max_retries: 3) { |request| + bulk.each_request(max_retries: 3, last_retry_in_small_chunks: false) { |request| retries += 1 raise Esse::Transport::RequestTimeoutError } @@ -49,6 +49,29 @@ end end + context 'when on last retry and last_retry_in_small_chunks is true' do + let(:index) do + %w[foo bar baz].each_with_index.map { |name, idx| Esse::HashDocument.new(id: idx + 10, source: { name: name }) } + end + let(:create) do + %w[foo bar baz].each_with_index.map { |name, idx| Esse::HashDocument.new(id: idx + 20, source: { name: name }) } + end + let(:delete) do + %w[foo bar baz].each_with_index.map { |name, idx| Esse::HashDocument.new(id: idx + 30, source: { name: name }) } + end + let(:bulk) { described_class.new(index: index, create: create, delete: delete) } + + it 'retries in small chunks' do + expect(bulk).to receive(:sleep).with(an_instance_of(Integer)).exactly(3).times + requests = [] + bulk.each_request(last_retry_in_small_chunks: true) { |request| + requests << request + raise Faraday::TimeoutError if [1, 2, 3].include?(requests.size) + } + expect(requests.size).to eq(3 + index.size + create.size + delete.size) + end + end + context 'with a request entity too large error' do let(:index) { [Esse::HashDocument.new(id: 1, source: { name: 'Aaa' * 30 })] } let(:create) { [Esse::HashDocument.new(id: 2, source: { name: 'Bbbb' * 100 })] }