Skip to content

Commit

Permalink
during import, use the doc directly should reduce amount of allocated…
Browse files Browse the repository at this point in the history
… objects
  • Loading branch information
marcosgz committed Aug 19, 2024
1 parent 4366b22 commit 69e67c7
Show file tree
Hide file tree
Showing 12 changed files with 260 additions and 86 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* Add preload_lazy_attributes option to the import in order to fetch the lazy attributes in a single query before bulk indexing

## 0.3.6 - 2024-08-07
* Esse::LazyDocumentHeader#to_doc return `Esse::LazyDocumentHeader::Document` instance to properly separate context metadata from document source
* Esse::LazyDocumentHeader#to_doc return `Esse::DocumentForPartialUpdate` instance to properly separate context metadata from document source
* Add `.collection_class` method to the `Esse::Repository` class to let external plugins and extensions to access it instead of read @collection_proc variable

## 0.3.5 - 2024-08-02
Expand Down
1 change: 1 addition & 0 deletions lib/esse/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module Esse
require_relative 'primitives'
require_relative 'collection'
require_relative 'document'
require_relative 'document_for_partial_update'
require_relative 'document_lazy_attribute'
require_relative 'lazy_document_header'
require_relative 'hash_document'
Expand Down
17 changes: 13 additions & 4 deletions lib/esse/document.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,16 @@ def ignore_on_delete?
id.nil?
end

def ==(other)
other.is_a?(self.class) && (
id == other.id && type == other.type && routing == other.routing && meta == other.meta && source == other.source
)
def eql?(other, match_lazy_doc_header: false)
if match_lazy_doc_header && other.is_a?(LazyDocumentHeader)
other.eql?(self)
else
other.is_a?(self.class) && (
id.to_s == other.id.to_s && type == other.type && routing == other.routing && meta == other.meta && source == other.source
)
end
end
alias_method :==, :eql?

def doc_header
{ _id: id }.tap do |h|
Expand Down Expand Up @@ -120,5 +125,9 @@ def mutated_source

@__mutated_source__ ||= source.merge(@__mutations__)
end

def document_for_partial_update(source)
DocumentForPartialUpdate.new(self, source: source)
end
end
end
16 changes: 16 additions & 0 deletions lib/esse/document_for_partial_update.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true

module Esse
class DocumentForPartialUpdate < Esse::Document
extend Forwardable

def_delegators :object, :id, :type, :routing, :options

attr_reader :source

def initialize(lazy_header, source:)
@source = source
super(lazy_header)
end
end
end
11 changes: 6 additions & 5 deletions lib/esse/index/documents.rb
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l
attrs = lazy_attrs_to_eager_load.is_a?(Array) ? lazy_attrs_to_eager_load : repo.lazy_document_attribute_names(lazy_attrs_to_eager_load)
attrs.each do |attr_name|
repo.retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value|
doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing }
doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) }
doc&.mutate(attr_name) { value }
end
end
Expand All @@ -262,16 +262,17 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l
if lazy_attrs_to_search_preload.any?
hits = repo.index.search(query: {ids: {values: entries.map(&:id)} }, _source: lazy_attrs_to_search_preload).response.hits
hits.each do |hit|
doc_header = Esse::LazyDocumentHeader.coerce(hit.slice('_id', '_routing')) # TODO Add '_type', when adjusting eql to tread _doc properly
next unless doc_header.valid?
doc_header = Esse::LazyDocumentHeader.coerce(hit.slice('_id', '_routing', '_type'))
next if doc_header.id.nil?

hit.dig('_source')&.each do |attr_name, attr_value|
real_attr_name = repo.lazy_document_attribute_names(attr_name).first
preload_search_result[real_attr_name][doc_header] = attr_value
end
end
preload_search_result.each do |attr_name, values|
values.each do |doc_header, value|
doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing }
doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) }
doc&.mutate(attr_name) { value }
end
end
Expand All @@ -282,7 +283,7 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l
lazy_attrs_to_update_after.each do |attr_name|
preloaded_ids = preload_search_result[attr_name].keys
filtered_docs = entries.reject do |doc|
doc.ignore_on_index? || preloaded_ids.any? { |d| d.id.to_s == doc.id.to_s && d.type == doc.type && d.routing == doc.routing }
doc.ignore_on_index? || preloaded_ids.any? { |d| doc.eql?(d, match_lazy_doc_header: true) }
end
next if filtered_docs.empty?

Expand Down
2 changes: 1 addition & 1 deletion lib/esse/index/object_document_mapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def each_serialized_batch(repo_name = nil, **kwargs, &block)
# @return [Enumerator] All serialized entries
def documents(repo_name = nil, **kwargs)
Enumerator.new do |yielder|
each_serialized_batch(repo_name, **kwargs) do |documents, **_collection_kargs|
each_serialized_batch(repo_name, **kwargs) do |documents|
documents.each { |document| yielder.yield(document) }
end
end
Expand Down
36 changes: 13 additions & 23 deletions lib/esse/lazy_document_header.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

module Esse
class LazyDocumentHeader
ACCEPTABLE_CLASSES = [Esse::LazyDocumentHeader, Esse::Document].freeze
ACCEPTABLE_DOC_TYPES = [nil, '_doc', 'doc'].freeze

def self.coerce_each(values)
arr = []
Esse::ArrayUtils.wrap(values).flatten.map do |value|
instance = coerce(value)
arr << instance if instance&.valid?
arr << instance if instance && !instance.id.nil?
end
arr
end
Expand All @@ -17,7 +20,7 @@ def self.coerce(value)
if value.is_a?(Esse::LazyDocumentHeader)
value
elsif value.is_a?(Esse::Document)
new(**value.options, id: value.id, type: value.type, routing: value.routing)
value
elsif value.is_a?(Hash)
resp = value.transform_keys do |key|
case key
Expand Down Expand Up @@ -47,37 +50,24 @@ def initialize(id:, type: nil, routing: nil, **extra_attributes)
@options = extra_attributes.freeze
end

def valid?
!id.nil?
end

def to_h
options.merge(_id: id).tap do |hash|
hash[:_type] = type if type
hash[:routing] = routing if routing
end
end

def to_doc(source = {})
Document.new(self, source: source)
def document_for_partial_update(source)
Esse::DocumentForPartialUpdate.new(self, source: source)
end

def eql?(other)
self.class == other.class && id == other.id && type == other.type && routing == other.routing
def eql?(other, **)
ACCEPTABLE_CLASSES.any? { |klass| other.is_a?(klass) } &&
id.to_s == other.id.to_s &&
routing == other.routing &&
((ACCEPTABLE_DOC_TYPES.include?(type) && ACCEPTABLE_DOC_TYPES.include?(other.type)) || type == other.type)
end
alias_method :==, :eql?

class Document < Esse::Document
extend Forwardable

def_delegators :object, :id, :type, :routing, :options

attr_reader :source

def initialize(lazy_header, source: {})
@source = source
super(lazy_header)
end
end
end
end

11 changes: 5 additions & 6 deletions lib/esse/repository/documents.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def update_documents_attribute(name, ids_or_doc_headers = [], kwargs = {})

def documents_for_lazy_attribute(name, ids_or_doc_headers)
retrieve_lazy_attribute_values(name, ids_or_doc_headers).map do |doc_header, datum|
doc_header.to_doc(name => datum)
doc_header.document_for_partial_update(name => datum)
end
end

Expand All @@ -36,11 +36,10 @@ def retrieve_lazy_attribute_values(name, ids_or_doc_headers)
return [] unless result.is_a?(Hash)

result.each_with_object({}) do |(key, value), memo|
if key.is_a?(LazyDocumentHeader) && (doc = docs.find { |d| d == key || d.id == key.id })
memo[doc] = value
elsif (doc = docs.find { |d| d.id == key })
memo[doc] = value
end
val = docs.find { |doc| doc.eql?(key, match_lazy_doc_header: true) || doc.id == key }
next unless val

memo[val] = value
end
end
end
Expand Down
6 changes: 3 additions & 3 deletions lib/esse/repository/object_document_mapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ def each_serialized_batch(lazy_attributes: false, **kwargs)
attrs = lazy_attributes.is_a?(Array) ? lazy_attributes : lazy_document_attribute_names(lazy_attributes)
attrs.each do |attr_name|
retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value|
doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing }
doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) }
doc&.mutate(attr_name) { value }
end
end
end

yield entries, **collection_context
yield entries
end
end

Expand All @@ -110,7 +110,7 @@ def each_serialized_batch(lazy_attributes: false, **kwargs)
# @return [Enumerator] All serialized entries
def documents(**kwargs)
Enumerator.new do |yielder|
each_serialized_batch(**kwargs) do |docs, **_collection_kargs|
each_serialized_batch(**kwargs) do |docs|
docs.each { |document| yielder.yield(document) }
end
end
Expand Down
46 changes: 46 additions & 0 deletions spec/esse/document_for_partial_update_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# frozen_string_literal: true

require 'spec_helper'

# rubocop:disable RSpec/VerifiedDoubles
RSpec.describe Esse::DocumentForPartialUpdate do
let(:document) { described_class.new(obj, source: source) }
let(:obj) { double(id: 1) }
let(:source) { { foo: :bar } }

describe '#object' do
subject { document.object }

it { is_expected.to be obj }
end

describe '#id' do
subject { document.id }

it { is_expected.to eq 1 }
end

describe '#type' do
subject { document.type }

let(:obj) { double(id: 1, type: 'foo', source: source) }

it { is_expected.to eq 'foo' }
end

describe '#routing' do
subject { document.routing }

let(:obj) { double(id: 1, routing: 'foo', source: source) }

it { is_expected.to eq 'foo' }
end

describe '#source' do
subject { document.source }

let(:obj) { double(id: 1, source: { original: 'source' }) }

it { is_expected.to eq source }
end
end
Loading

0 comments on commit 69e67c7

Please sign in to comment.