Skip to content

Commit

Permalink
Implement notes from Nick
Browse files Browse the repository at this point in the history
  • Loading branch information
robsears committed Apr 27, 2016
1 parent 4c00cc3 commit 0951991
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 31 deletions.
20 changes: 4 additions & 16 deletions lib/searchyou/generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def generate(site)
Searchyou.configure(site)

# Prepare the indexer
indexer = Searchyou::Indexer.new(Searchyou.configuration.url)
indexer = Searchyou::Indexer.new(Searchyou.configuration)
indexer.start

# Iterate through the site contents and send to indexer
Expand Down Expand Up @@ -48,23 +48,11 @@ def self.configure(site)
self.configuration ||= Configuration.new(site)
end

# Class containing configuration options
class Configuration
attr_accessor :url, :number_of_shards, :number_of_replicas, :index_name, :default_type

attr_accessor :site
def initialize(site)

# Figure out the Elasticsearch URL, from an environment variable or the
# Jekyll site configuration. Raises an exception if none is found, so we
# can skip the indexing.
@url = ENV['BONSAI_URL'] || ENV['ELASTICSEARCH_URL'] ||
((site.config||{})['elasticsearch']||{})['url'] ||
raise(ArgumentError, "No Elasticsearch URL present, skipping indexing")

# Get the rest of the config options, or use the defaults:
@number_of_shards = site.config['elasticsearch']['number_of_shards'] || 1
@number_of_replicas = site.config['elasticsearch']['number_of_replicas'] || 1
@index_name = site.config['elasticsearch']['index_name'] || "jekyll"
@default_type = site.config['elasticsearch']['default_type'] || 'post'
@site = site.config
end
end
end
90 changes: 75 additions & 15 deletions lib/searchyou/indexer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,46 @@ class Indexer
BATCH_SIZE = 50

attr_accessor :indexer_thread
attr_accessor :old_indices
attr_accessor :queue
attr_accessor :timestamp
attr_accessor :uri
attr_accessor :working

def initialize(elasticsearch_url)
def initialize(configuration)
self.uri = URI(elasticsearch_url)
self.queue = Queue.new
self.working = true
self.timestamp = Time.now
end

# Determine a URL for the cluster, or fail with error
def elasticsearch_url
ENV['BONSAI_URL'] || ENV['ELASTICSEARCH_URL'] ||
((Searchyou.configuration.site||{})['elasticsearch']||{})['url'] ||
raise(ArgumentError, "No Elasticsearch URL present, skipping indexing")
end

# Getter for the number of primary shards
def elasticsearch_number_of_shards
Searchyou.configuration.site['elasticsearch']['number_of_shards'] || 1
end

# Getter for the number of replicas
def elasticsearch_number_of_replicas
Searchyou.configuration.site['elasticsearch']['number_of_replicas'] || 1
end

# Getter for the index name
def elasticsearch_index_base_name
Searchyou.configuration.site['elasticsearch']['index_name'] || "jekyll"
end

# Getter for the default type
def elasticsearch_default_type
Searchyou.configuration.site['elasticsearch']['default_type'] || 'post'
end

# Public: Add new documents for batch indexing.
def <<(doc)
self.queue << doc
Expand All @@ -31,8 +59,8 @@ def working?

# A versioned index name, based on the time of the indexing run.
# Will be later added to an alias for hot reindexing.
def es_index_name
"#{Searchyou.configuration.index_name}-#{timestamp.strftime('%Y%m%d%H%M%S')}"
def elasticsearch_index_name
"#{elasticsearch_index_base_name}-#{timestamp.strftime('%Y%m%d%H%M%S')}"
end

# Prepare an HTTP connection
Expand All @@ -46,14 +74,14 @@ def http_start(&block)

# Prepare our indexing run by creating a new index.
def prepare_index
create_index = http_post("/#{es_index_name}")
create_index = http_post("/#{elasticsearch_index_name}")
create_index.body = {
index: {
number_of_shards: Searchyou.configuration.number_of_shards,
number_of_shards: elasticsearch_number_of_shards,
number_of_replicas: 0,
refresh_interval: -1
}
}.to_json
}.to_json # TODO: index settings

http_start do |http|
resp = http.request(create_index)
Expand All @@ -76,9 +104,24 @@ def start
end
end

# Helper method for creating a Net::HTTP::Post to ES
def http_put(path)
http_request(Net::HTTP::Put, path)
end

def http_post(path)
req = Net::HTTP::Post.new(path)
http_request(Net::HTTP::Post, path)
end

def http_get(path)
http_request(Net::HTTP::Get, path)
end

def http_delete(path)
http_request(Net::HTTP::Delete, path)
end

def http_request(klass, path)
req = klass.new(path)
req.content_type = 'application/json'
req.basic_auth(uri.user, uri.password)
req
Expand All @@ -87,13 +130,11 @@ def http_post(path)
# Given a batch (array) of documents, index them into Elasticsearch
# using its Bulk Update API.
# https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
# TODO: choose a better type name, or make it configurable?
def es_bulk_insert!(http, batch)
bulk_insert = http_post("/#{es_index_name}/#{Searchyou.configuration.default_type}/_bulk")
bulk_insert = http_post("/#{elasticsearch_index_name}/#{elasticsearch_default_type}/_bulk")
bulk_insert.body = batch.map do |doc|
[ { :index => {} }.to_json, doc.to_json ].join("\n")
end.join("\n") + "\n"
puts bulk_insert.body
http.request(bulk_insert)
end

Expand All @@ -116,23 +157,42 @@ def finish
finalize!
end

def old_indices
resp = http_start { |h| h.request(http_get("/_cat/indices?h=index")) }
indices = JSON.parse(resp.body).map{|i|i['index']}
indices = indices.select{|i| i =~ /\A#{elasticsearch_index_base_name}/ }
indices = indices - [ elasticsearch_index_name ]
self.old_indices = indices
end

# Once documents are done being indexed, finalize the process by adding
# the new index into an alias for searching.
# TODO: cleanup old indices?
def finalize!
refresh = http_post("/#{es_index_name}/_refresh")
# refresh the index to make it searchable
refresh = http_post("/#{elasticsearch_index_name}/_refresh")

# add replication to the new index
add_replication = http_put("/#{elasticsearch_index_name}/_settings")
add_replication.body = { index: { number_of_replicas: elasticsearch_number_of_replicas }}.to_json

# hot swap the index into the canonical alias
update_aliases = http_post("/_aliases")
update_aliases.body = {
"actions": [
{ "remove": { "index": "*", "alias": "jekyll" }},
{ "add": { "index": es_index_name, "alias": "jekyll" }}
{ "remove": { "index": old_indices.join(','), "alias": elasticsearch_index_base_name }},
{ "add": { "index": elasticsearch_index_name, "alias": elasticsearch_index_base_name }}
]
}.to_json

# delete old indices
cleanup_indices = http_delete("/#{old_indices.join(',')}")

# run the prepared requests
http_start do |http|
http.request(refresh)
http.request(add_replication)
http.request(update_aliases)
http.request(cleanup_indices)
end
end

Expand Down

0 comments on commit 0951991

Please sign in to comment.