Skip to content

Commit

Permalink
better documentation on agent/runner/pipeline methods
Browse files Browse the repository at this point in the history
Fixes #4520
  • Loading branch information
jsvd committed Feb 5, 2016
1 parent e32f732 commit 7d8d6e4
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 13 deletions.
33 changes: 22 additions & 11 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@
class LogStash::Agent
attr_reader :logger, :pipelines

# initialize method for LogStash::Agent
# @param params [Hash] potential parameters are:
# :node_name [String] - identifier for the agent
# :auto_reload [Boolean] - enable reloading of pipelines
# :reload_interval [Integer] - reload pipelines every X seconds
# :logger [Cabin::Channel] - logger instance
def initialize(params)
@logger = params[:logger]
@auto_reload = params[:auto_reload]
@pipelines = {}

@node_name = params[:node_name] || Socket.gethostname
@config_loader = LogStash::Config::Loader.new(@logger)
@reload_interval = params[:reload_interval] || 3 # seconds
Expand Down Expand Up @@ -48,10 +54,10 @@ def execute
end
end

# register_pipeline adds a pipeline to the agent's state
# register_pipeline - adds a pipeline to the agent's state
# @param pipeline_id [String] pipeline string identifier
# @param settings [Hash] settings for the pipeline. keys should be symbols
# such as :pipeline_workers and :pipeline_batch_delay
# @param settings [Hash] settings that will be passed when creating the pipeline.
# keys should be symbols such as :pipeline_workers and :pipeline_batch_delay
def register_pipeline(pipeline_id, settings)
pipeline = create_pipeline(settings.merge(:pipeline_id => pipeline_id))
return unless pipeline.is_a?(LogStash::Pipeline)
Expand All @@ -60,11 +66,14 @@ def register_pipeline(pipeline_id, settings)

def reload_state!
@upgrade_mutex.synchronize do
@pipelines.each { |pipeline_id, _| reload_pipeline!(pipeline_id) }
@pipelines.each do |pipeline_id, _|
begin
reload_pipeline!(pipeline_id)
rescue => e
@logger.error I18n.t("oops", :error => e, :backtrace => e.backtrace)
end
end
end
rescue => e
@logger.error I18n.t("oops", :error => e, :backtrace => e.backtrace)
return 1
end

def shutdown
Expand All @@ -78,7 +87,7 @@ def node_uuid

def create_pipeline(settings)
begin
config = fetch_config(settings[:config_path], settings[:config_string])
config = fetch_config(settings)
rescue => e
@logger.error("failed to fetch pipeline configuration", :message => e.message)
return
Expand All @@ -92,10 +101,12 @@ def create_pipeline(settings)
end
end

def fetch_config(config_path, config_string)
@config_loader.format_config(config_path, config_string)
def fetch_config(settings)
@config_loader.format_config(settings[:config_path], settings[:config_string])
end

# since this method modifies the @pipelines hash it is
# wrapped in @upgrade_mutex in the parent call `reload_state!`
def reload_pipeline!(id)
old_pipeline = @pipelines[id]
new_pipeline = create_pipeline(old_pipeline.original_settings)
Expand Down
1 change: 0 additions & 1 deletion logstash-core/lib/logstash/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ def execute
return 1
rescue => e
@logger.fatal I18n.t("oops", :error => e, :backtrace => e.backtrace)
show_short_help
return 1
ensure
Stud::untrap("INT", sigint_id) unless sigint_id.nil?
Expand Down
3 changes: 2 additions & 1 deletion logstash-core/spec/logstash/agent_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@
end

it "should join the config string and config path content" do
fetched_config = subject.send(:fetch_config, tmp_config_path, cli_config)
settings = { :config_path => tmp_config_path, :config_string => cli_config }
fetched_config = subject.send(:fetch_config, settings)
expect(fetched_config.strip).to eq(cli_config + IO.read(tmp_config_path))
end

Expand Down

0 comments on commit 7d8d6e4

Please sign in to comment.