Skip to content

Commit

Permalink
feat: improve the index reset task by polling the task id
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosgz committed Oct 4, 2024
1 parent 581bbbc commit 3d54b86
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 11 deletions.
24 changes: 19 additions & 5 deletions lib/esse/index/indices.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,11 @@ def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: tru

suffix ||= Esse.timestamp
suffix = Esse.timestamp while index_exist?(suffix: suffix)
syncronous_import = true
syncronous_import = false if reindex.is_a?(Hash) && reindex[:wait_for_completion] == false

if optimize && import
optimized_creation = optimize && syncronous_import && (import || reindex)
if optimized_creation
definition = [settings_hash(settings: settings), mappings_hash].reduce(&:merge)
number_of_replicas = definition.dig(Esse::SETTING_ROOT_KEY, :index, :number_of_replicas)
refresh_interval = definition.dig(Esse::SETTING_ROOT_KEY, :index, :refresh_interval)
Expand Down Expand Up @@ -84,21 +87,32 @@ def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: tru
end
end

if optimize && import && number_of_replicas != new_number_of_replicas || refresh_interval != new_refresh_interval
if optimized_creation && number_of_replicas != new_number_of_replicas || refresh_interval != new_refresh_interval
update_settings(suffix: suffix, settings: settings)
refresh(suffix: suffix)
end

update_aliases(suffix: suffix)
update_aliases(suffix: suffix) if syncronous_import

true
end

# Copies documents from a source to a destination.
#
# To avoid http timeout, we are sending the request with `wait_for_completion: false` and polling the task
# until it is completed.
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html
def reindex(body: , wait_for_completion: false, scroll: '30m', **options)
cluster.api.reindex(**options, body: body, scroll: scroll, wait_for_completion: wait_for_completion)
def reindex(body:, wait_for_completion: true, scroll: '30m', poll_interval: 5, **options)
resp = cluster.api.reindex(**options, body: body, scroll: scroll, wait_for_completion: false)
return resp unless wait_for_completion

task_id = resp['task']
task = nil
while (task = cluster.api.task(id: task_id))['completed'] == false
sleep poll_interval
end
task
end

# Checks the index existance. Returns true or false
Expand Down
20 changes: 14 additions & 6 deletions spec/support/shared_examples/index_reset_index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,29 +82,37 @@
end
end

it 'reindex data from the old index to the new index' do
it 'create async task to reindex data from the old index and do not update the alias' do
es_client do |client, _conf, cluster|
GeosIndex.create_index(alias: true, suffix: '2021')
GeosIndex.import(refresh: true)

expect {
GeosIndex.reset_index(suffix: index_suffix, import: false, reindex: true, refresh: true)
GeosIndex.reset_index(suffix: index_suffix, import: false, reindex: { wait_for_completion: false }, refresh: true)
}.not_to raise_error

expect(GeosIndex.indices_pointing_to_alias).to eq(["#{GeosIndex.index_name}_#{index_suffix}"])
expect(GeosIndex.indices_pointing_to_alias).to eq(["#{GeosIndex.index_name}_2021"])
expect(GeosIndex.index_exist?(suffix: '2021')).to eq(true)
expect(GeosIndex.index_exist?(suffix: index_suffix)).to eq(true)
expect(GeosIndex.count).to be_positive

count = 0
(0..3).each do |t|
GeosIndex.refresh(suffix: index_suffix)
count = GeosIndex.count(suffix: index_suffix)
break if count.positive?
sleep(t) if t.positive?
end
expect(count).to be_positive
end
end

it 'forwads the reindex options to the reindex method' do
it 'reindex data from the old index to the new index by awaiting for completion' do
es_client do |client, _conf, cluster|
GeosIndex.create_index(alias: true, suffix: '2021')
GeosIndex.import(refresh: true)

expect {
GeosIndex.reset_index(suffix: index_suffix, import: false, reindex: { wait_for_completion: true }, refresh: true)
GeosIndex.reset_index(suffix: index_suffix, import: false, reindex: { wait_for_completion: true, poll_interval: 0.2 }, refresh: true)
}.not_to raise_error

expect(GeosIndex.indices_pointing_to_alias).to eq(["#{GeosIndex.index_name}_#{index_suffix}"])
Expand Down

0 comments on commit 3d54b86

Please sign in to comment.