Skip to content

Commit

Permalink
feat: refactoring by reusing each_serialized_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosgz committed Aug 23, 2024
1 parent 5286311 commit 0d7142a
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 87 deletions.
6 changes: 6 additions & 0 deletions lib/esse/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,10 @@ def self.document?(object)

!!(object.is_a?(Esse::Document) && object.id)
end

def self.document_match_with_header?(document, id, routing, type)
id && id.to_s == document.id.to_s &&
routing == document.routing &&
(LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(document.type) && LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(type) || document.type == type)
end
end
14 changes: 10 additions & 4 deletions lib/esse/document.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

module Esse
class Document
MUTATIONS_FALLBACK = {}.freeze

attr_reader :object, :options

def initialize(object, **options)
Expand Down Expand Up @@ -102,6 +104,10 @@ def doc_header
end
end

def document_for_partial_update(source)
DocumentForPartialUpdate.new(self, source: source)
end

def inspect
attributes = %i[id routing source].map do |attr|
value = send(attr)
Expand All @@ -120,14 +126,14 @@ def mutate(key)
instance_variable_set(:@__mutated_source__, nil)
end

def mutations
@__mutations__ || MUTATIONS_FALLBACK
end

def mutated_source
return source unless @__mutations__

@__mutated_source__ ||= source.merge(@__mutations__)
end

def document_for_partial_update(source)
DocumentForPartialUpdate.new(self, source: source)
end
end
end
95 changes: 20 additions & 75 deletions lib/esse/index/documents.rb
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,6 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l
update_lazy_attributes = options.delete(:lazy_update_document_attributes)
end

doc_header_check = ->(doc, (id, routing, type)) do
id && id.to_s == doc.id.to_s &&
routing == doc.routing &&
(LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(doc.type) && LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(type) || doc.type == type)
end

repo_hash.slice(*repo_types).each do |repo_name, repo|
# Elasticsearch 6.x and older have multiple types per index.
# This gem supports multiple types per index for backward compatibility, but we recommend to update
Expand All @@ -274,83 +268,34 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l
bulk_kwargs = { suffix: suffix, type: repo_name, **options }
cluster.may_update_type!(bulk_kwargs)

lazy_attrs_to_eager_load = repo.lazy_document_attribute_names(eager_load_lazy_attributes)
lazy_attrs_to_search_preload = repo.lazy_document_attribute_names(preload_lazy_attributes)
lazy_attrs_to_update_after = repo.lazy_document_attribute_names(update_lazy_attributes)
lazy_attrs_to_update_after -= lazy_attrs_to_eager_load
lazy_attrs_to_search_preload -= lazy_attrs_to_eager_load

# @TODO Refactor this by combining the upcoming code again with repo.each_serialized_batch as it was before:
# context[:eager_load_lazy_attributes] = lazy_attrs_to_eager_load if lazy_attrs_to_eager_load.any?
# repo.each_serialized_batch(**context) do |batch|
# bulk(**bulk_kwargs, index: batch)

# lazy_attrs_to_update_after.each do |attr_name|
# partial_docs = repo.documents_for_lazy_attribute(attr_name, batch.reject(&:ignore_on_index?))
# next if partial_docs.empty?

# bulk(**bulk_kwargs, update: partial_docs)
# end
# count += batch.size
# end
context ||= {}
repo.send(:each_batch, **context) do |*args|
batch, collection_context = args
collection_context ||= {}
entries = [*batch].map { |entry| repo.serialize(entry, **collection_context) }.compact
context[:eager_load_lazy_attributes] = eager_load_lazy_attributes
context[:preload_lazy_attributes] = preload_lazy_attributes
repo.each_serialized_batch(**context) do |batch|
bulk(**bulk_kwargs, index: batch)

lazy_attrs_to_eager_load.each do |attr_name|
repo.retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value|
doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) }
doc&.mutate(attr_name) { value }
end
end

preload_search_result = Hash.new { |h, arr_name| h[arr_name] = {} }
if lazy_attrs_to_search_preload.any?
entries.group_by(&:routing).each do |routing, docs|
search_request = { query: {ids: {values: entries.map(&:id)} }, _source: lazy_attrs_to_search_preload }
search_request[:routing] = routing if routing
hits = repo.index.search(**search_request).response.hits
hits.each do |hit|
header = [hit['_id'], hit['_routing'], hit['_type']]
next if header[0].nil?
if update_lazy_attributes != false
attrs = repo.lazy_document_attribute_names(update_lazy_attributes)
attrs -= repo.lazy_document_attribute_names(eager_load_lazy_attributes)
update_attrs = attrs.each_with_object(Hash.new { |h, k| h[k] = {} }) do |attr_name, memo|
filtered_docs = batch.reject do |doc|
doc.ignore_on_index? || doc.mutations.key?(attr_name)
end
next if filtered_docs.empty?

hit.dig('_source')&.each do |attr_name, attr_value|
real_attr_name = repo.lazy_document_attribute_names(attr_name).first
preload_search_result[real_attr_name][header] = attr_value
end
repo.retrieve_lazy_attribute_values(attr_name, filtered_docs).each do |doc, value|
memo[doc.doc_header][attr_name] = value
end
preload_search_result.each do |attr_name, values|
values.each do |header, value|
doc = entries.find { |d| doc_header_check.call(d, header) }
doc&.mutate(attr_name) { value }
end
end
if update_attrs.any?
bulk_update = update_attrs.map do |header, values|
header.merge(data: {doc: values})
end
bulk(**bulk_kwargs, update: bulk_update)
end
end

bulk(**bulk_kwargs, index: entries)

update_lazy_attrs = Hash.new { |h, k| h[k] = {} }
lazy_attrs_to_update_after.each do |attr_name|
preloaded_ids = preload_search_result[attr_name].keys
filtered_docs = entries.reject do |doc|
doc.ignore_on_index? || preloaded_ids.any? { |header| doc_header_check.call(doc, header) }
end
next if filtered_docs.empty?

repo.retrieve_lazy_attribute_values(attr_name, filtered_docs).map do |doc, datum|
update_lazy_attrs[doc.doc_header][attr_name] = datum
end
end
if update_lazy_attrs.any?
bulk_update = update_lazy_attrs.map do |header, values|
header.merge(data: {doc: values})
end
bulk(**bulk_kwargs, update: bulk_update)
end
count += entries.size
count += batch.size
end
end
count
Expand Down
39 changes: 32 additions & 7 deletions lib/esse/repository/object_document_mapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,46 @@ def collection_class
# @param [Hash] kwargs The context
# @return [Enumerator] The enumerator
# @yield [Array, **context] serialized collection and the optional context from the collection
def each_serialized_batch(eager_load_lazy_attributes: false, **kwargs)
def each_serialized_batch(eager_load_lazy_attributes: false, preload_lazy_attributes: false, **kwargs)
if kwargs.key?(:lazy_attributes)
warn 'The `lazy_attributes` option is deprecated. Use `eager_load_lazy_attributes` instead.'
eager_load_lazy_attributes = kwargs.delete(:lazy_attributes)
end

lazy_attrs_to_eager_load = lazy_document_attribute_names(eager_load_lazy_attributes)
lazy_attrs_to_search_preload = lazy_document_attribute_names(preload_lazy_attributes)
lazy_attrs_to_search_preload -= lazy_attrs_to_eager_load

each_batch(**kwargs) do |*args|
batch, collection_context = args
collection_context ||= {}
entries = [*batch].map { |entry| serialize(entry, **collection_context) }.compact
if eager_load_lazy_attributes
attrs = eager_load_lazy_attributes.is_a?(Array) ? eager_load_lazy_attributes : lazy_document_attribute_names(eager_load_lazy_attributes)
attrs.each do |attr_name|
retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value|
doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) }
doc&.mutate(attr_name) { value }
lazy_attrs_to_eager_load.each do |attr_name|
retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value|
doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) }
doc&.mutate(attr_name) { value }
end
end

if lazy_attrs_to_search_preload.any?
entries.group_by(&:routing).each do |routing, docs|
search_request = {
query: { ids: { values: docs.map(&:id) } },
size: docs.size,
_source: lazy_attrs_to_search_preload
}
search_request[:routing] = routing if routing
index.search(**search_request).response.hits.each do |hit|
header = [hit['_id'], hit['_routing'], hit['_type']]
next if header[0].nil?

hit.dig('_source')&.each do |attr_name, attr_value|
real_attr_name = lazy_document_attribute_names(attr_name).first
next if real_attr_name.nil?

doc = entries.find { |d| Esse.document_match_with_header?(d, *header) }
doc&.mutate(real_attr_name) { attr_value }
end
end
end
end
Expand Down
58 changes: 57 additions & 1 deletion spec/esse/repository/document_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@
end
end

context 'with lazy_load_attributes' do
context 'with eager_load_lazy_attributes' do
include_context 'with stories index definition'

it 'yields serialized objects with lazy attributes when passing eager_load_lazy_attributes: true' do
Expand Down Expand Up @@ -187,6 +187,62 @@
expect(expected_data.select { |doc| doc.to_h.key?(:tags) && !doc.to_h.key?(:tags_count) }).not_to be_empty
end
end

context 'with preload_lazy_attributes' do
include_context 'with stories index definition'

let(:nyt_hits) do
nyt_stories.map do |hash|
{
'_id' => hash[:id].to_s,
'_routing' => 'nyt',
'_type' => '_doc',
'_source' => {
'tags' => hash[:tags],
'tags_count' => hash[:tags].size,
},
}
end
end

let(:wsj_hits) do
wsj_stories.map do |hash|
{
'_id' => hash[:id].to_s,
'_routing' => 'wsj',
'_type' => '_doc',
'_source' => {
'tags' => hash[:tags],
'tags_count' => hash[:tags].size,
},
}
end
end

it 'yields serialized objects with lazy attributes when passing preload_lazy_attributes: true' do
expect(StoriesIndex).to receive(:search).with(
query: { ids: { values: nyt_stories.map { |hash| hash[:id] } } },
size: nyt_stories.size,
_source: %i[tags tags_count],
routing: 'nyt',
).and_return(double(response: double(hits: nyt_hits)))
expect(StoriesIndex).to receive(:search).with(
query: { ids: { values: wsj_stories.map { |hash| hash[:id] } } },
size: wsj_stories.size,
_source: %i[tags tags_count],
routing: 'wsj',
).and_return(double(response: double(hits: wsj_hits)))
expected_data = []
expect {
StoriesIndex::Story.each_serialized_batch(preload_lazy_attributes: true) do |batch|
expected_data.push(*batch)
end
}.not_to raise_error
expect(expected_data.map(&:mutations)).to all(
include(tags: be_a(Array), tags_count: be_a(Integer)),
)
end
end
end

describe '.documents' do
Expand Down

0 comments on commit 0d7142a

Please sign in to comment.