Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes error indexing (very) large records from PDC Describe #724

Merged
merged 8 commits into from
Jan 13, 2025
31 changes: 17 additions & 14 deletions app/lib/describe_indexer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,12 @@ def index
end
end

# Given a json document, return an XML string that contains
# the JSON blob as a CDATA element
# Converts the JSON payload to XML which is what Traject expects
# @param [String] json
# @return [String]
def prep_for_indexing(json)
parsed = JSON.parse(json)
xml = parsed.to_xml
doc = Nokogiri::XML(xml)
collection_node = doc.at('group')
cdata = Nokogiri::XML::CDATA.new(doc, json)
collection_node.add_next_sibling("<pdc_describe_json></pdc_describe_json>")
pdc_describe_json_node = doc.at('pdc_describe_json')
pdc_describe_json_node.add_child(cdata)
doc.to_s
parsed.to_xml
Copy link
Member Author

@hectorcorrea hectorcorrea Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the biggest change in this PR. Instead of adding a CDATA XML element with the PDC record as JSON here...we use the JSON that is available in Traject as-is (see below)

end

def index_one(json)
Expand Down Expand Up @@ -96,24 +88,35 @@ def perform_indexing
urls_to_retry = []
rss_url_list.each do |url|
process_url(url)
rescue
rescue => ex
Rails.logger.warn "Indexing: Error importing record from #{url}. Will retry. Exception: #{ex.message}"
urls_to_retry << url
end

# retry an errored urls a second time and send error only if they don't work a second time
urls_to_retry.each do |url|
Rails.logger.info "Indexing: Retrying record #{url}."
process_url(url)
rescue => ex
Rails.logger.warn "Error importing record from #{url}. Exception: #{ex.message}"
Rails.logger.error "Indexing: Error importing record from #{url}. Retry failed. Exception: #{ex.message}"
Honeybadger.notify "Error importing record from #{url}. Exception: #{ex.message}"
end
end

def process_url(url)
uri = URI.open(url, open_timeout: 30, read_timeout: 30)
# Bumping the timeout to 60 seconds because datasets with lots of files (e.g. more than 30K files)
# can take a while to be read (for example https://pdc-describe-prod.princeton.edu/describe/works/470.json)
start_read = Time.zone.now
uri = URI.open(url, open_timeout: 60, read_timeout: 60)
resource_json = uri.read
elapsed_read = Time.zone.now - start_read

start_index = Time.zone.now
resource_xml = prep_for_indexing(resource_json)
traject_indexer.process(resource_xml)
Rails.logger.info "Successfully imported record from #{url}."
elapsed_index = Time.zone.now - start_index

timing_info = "(read: #{format('%.2f', elapsed_read)} s, index: #{format('%.2f', elapsed_index)} s)"
Rails.logger.info "Indexing: Successfully imported record from #{url}. #{timing_info} "
end
end
3 changes: 2 additions & 1 deletion config/pdc_discovery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ production:

staging:
<<: *default
pdc_describe_rss: <%= ENV["PDC_DESCRIBE_RSS"] || "https://pdc-describe-staging.princeton.edu/describe/works.rss" %>
# Notice that we fetch production data for indexing since it is more realistic
pdc_describe_rss: <%= ENV["PDC_DESCRIBE_RSS"] || "https://pdc-describe-prod.princeton.edu/describe/works.rss" %>
plausible_site_id: <%= "pdc-discovery-staging.princeton.edu" %>
7 changes: 5 additions & 2 deletions config/schedule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
# rake "index:research_data"
# end

# Rebuild index completely every 30 minutes while we're doing active data migration
every 30.minutes, roles: [:reindex] do
# Rebuild index completely every 60 minutes
#
# Bumped the schedule to 60 minutes since it's taking close to 30 minutes now that we are
# indexing datasets with very large number of files.
every 60.minutes, roles: [:reindex] do
rake "index:research_data"
end
38 changes: 28 additions & 10 deletions config/traject/pdc_describe_indexing_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,26 @@
provide 'solr.url', Indexing::SolrCloudHelper.collection_writer_url
provide 'reader_class_name', 'Traject::NokogiriReader'
provide 'solr_writer.commit_on_close', 'true'

# There are some parameters in Traject that allows us to configure values related
# to the Solr connection, in particular `batch_size` and the `thread_pool`. However,
# given that we are calling traject for each individual record (rather than for a
# batch of records) they might not apply to our scenario.
#
# The documentation is here in case we want to try them out:
# https://www.rubydoc.info/gems/traject/Traject/SolrJsonWriter

provide 'repository', ENV['REPOSITORY_ID']
provide 'logger', Logger.new($stderr, level: Logger::WARN)
end

# Converting the XML to JSON is a bit expensive therefore we make that conversion
# only once per record and save it to the context so that we can re-use it.
each_record do |record, context|
xml = record.xpath("/hash").first.to_xml
context.clipboard[:record_json] = Hash.from_xml(xml)["hash"].to_json
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same data we were adding to the XML CDATA element above.

end

# ==================
# Main fields

Expand All @@ -25,10 +41,8 @@
accumulator.concat [munged_doi]
end

# the <pdc_describe_json> element contains a CDATA node with a JSON blob in it
to_field 'pdc_describe_json_ss' do |record, accumulator, _c|
datacite = record.xpath("/hash/pdc_describe_json/text()").first.content
accumulator.concat [datacite]
to_field 'pdc_describe_json_ss' do |_record, accumulator, context|
accumulator.concat [context.clipboard[:record_json]]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the value that we set in the each_record block.

end

# Track the source of this record
Expand Down Expand Up @@ -99,21 +113,21 @@
end

# Extract the author data from the pdc_describe_json and save it on its own field as JSON
to_field 'authors_json_ss' do |record, accumulator, _c|
pdc_json = record.xpath("/hash/pdc_describe_json/text()").first.content
to_field 'authors_json_ss' do |_record, accumulator, context|
pdc_json = context.clipboard[:record_json]
authors = JSON.parse(pdc_json).dig("resource", "creators") || []
accumulator.concat [authors.to_json]
end

to_field 'authors_orcid_ssim' do |record, accumulator, _c|
pdc_json = record.xpath("/hash/pdc_describe_json/text()").first.content
to_field 'authors_orcid_ssim' do |_record, accumulator, context|
pdc_json = context.clipboard[:record_json]
authors_json = JSON.parse(pdc_json).dig("resource", "creators") || []
orcids = authors_json.map { |author| Author.new(author).orcid }
accumulator.concat orcids.compact.uniq
end

to_field 'authors_affiliation_ssim' do |record, accumulator, _c|
pdc_json = record.xpath("/hash/pdc_describe_json/text()").first.content
to_field 'authors_affiliation_ssim' do |_record, accumulator, context|
pdc_json = context.clipboard[:record_json]
authors_json = JSON.parse(pdc_json).dig("resource", "creators") || []
affiliations = authors_json.map { |author| Author.new(author).affiliation_name }
accumulator.concat affiliations.compact.uniq
Expand Down Expand Up @@ -223,6 +237,10 @@

# ==================
# Store files metadata as a single JSON string so that we can display detailed information for each of them.
#
# TODO: Note that this information is duplicated with what we save in `pdc_describe_json_ss`. For large
# datasets (e.g. those with 60K files) this is less than ideal. We should look into optimizing
# this when we take care of https://github.com/pulibrary/pdc_discovery/issues/738
to_field 'files_ss' do |record, accumulator, _context|
raw_doi = record.xpath("/hash/resource/doi/text()").to_s
files = record.xpath("/hash/files/file").map do |file|
Expand Down