Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjustments to send monitoring data directly to ES #11541

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 21 additions & 19 deletions config/logstash.yml
Original file line number Diff line number Diff line change
Expand Up @@ -225,26 +225,28 @@ pipeline.ordered: auto
# Default is false
# pipeline.separate_logs: false
#
# ------------ X-Pack Settings (not applicable for OSS build)--------------
#
# X-Pack Monitoring
# https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html
#xpack.monitoring.enabled: false
#xpack.monitoring.elasticsearch.username: logstash_system
#xpack.monitoring.elasticsearch.password: password
#xpack.monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# ------------ Monitoring Settings (not applicable for OSS build)--------------
#
# https://www.elastic.co/guide/en/logstash/current/monitoring-internal-collection.html
# Enable monitoring via internal collector to an Elasticsearch monitoring cluster
andsel marked this conversation as resolved.
Show resolved Hide resolved
#monitoring.enabled: false
andsel marked this conversation as resolved.
Show resolved Hide resolved
#monitoring.cluster_uuid: elasticsearch_cluster_uuid
#monitoring.elasticsearch.username: logstash_system
#monitoring.elasticsearch.password: password
#monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#xpack.monitoring.elasticsearch.cloud_id: monitoring_cluster_id:xxxxxxxxxx
#xpack.monitoring.elasticsearch.cloud_auth: logstash_system:password
#xpack.monitoring.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ]
#xpack.monitoring.elasticsearch.ssl.truststore.path: path/to/file
#xpack.monitoring.elasticsearch.ssl.truststore.password: password
#xpack.monitoring.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.monitoring.elasticsearch.ssl.keystore.password: password
#xpack.monitoring.elasticsearch.ssl.verification_mode: certificate
#xpack.monitoring.elasticsearch.sniffing: false
#xpack.monitoring.collection.interval: 10s
#xpack.monitoring.collection.pipeline.details.enabled: true
#monitoring.elasticsearch.cloud_id: monitoring_cluster_id:xxxxxxxxxx
#monitoring.elasticsearch.cloud_auth: logstash_system:password
#monitoring.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ]
#monitoring.elasticsearch.ssl.truststore.path: path/to/file
#monitoring.elasticsearch.ssl.truststore.password: password
#monitoring.elasticsearch.ssl.keystore.path: /path/to/file
#monitoring.elasticsearch.ssl.keystore.password: password
#monitoring.elasticsearch.ssl.verification_mode: certificate
#monitoring.elasticsearch.sniffing: false
#monitoring.collection.interval: 10s
#monitoring.collection.pipeline.details.enabled: true
# ------------ X-Pack Settings (not applicable for OSS build)--------------
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might want to consider removing the xpack settings here, and just have the monitoring settings. We should also make a note to surface this in release notes and blogs for the 7.7.0 release

#
# X-Pack Management
# https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html
Expand Down
10 changes: 10 additions & 0 deletions docker/data/logstash/env2yaml/env2yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ func normalizeSetting(setting string) (string, error) {
"modules",
"path.logs",
"path.plugins",
"monitoring.enabled",
"monitoring.collection.interval",
"monitoring.elasticsearch.hosts",
"monitoring.elasticsearch.username",
"monitoring.elasticsearch.password",
"monitoring.elasticsearch.ssl.certificate_authority",
"monitoring.elasticsearch.ssl.truststore.path",
"monitoring.elasticsearch.ssl.truststore.password",
"monitoring.elasticsearch.ssl.keystore.path",
"monitoring.elasticsearch.ssl.keystore.password",
"xpack.monitoring.enabled",
"xpack.monitoring.collection.interval",
"xpack.monitoring.elasticsearch.hosts",
Expand Down
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/api/commands/stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def report(stats, extended_stats=nil, opts={})
# @param vertex [Hash{String=>Object}]
# @return [Hash{String=>Object}]
def decorate_vertex(vertex)
plugin_id = vertex["id"]&.to_s
plugin_id = vertex[:id]&.to_s
return vertex unless plugin_id && LogStash::PluginMetadata.exists?(plugin_id)

plugin_metadata = LogStash::PluginMetadata.for_plugin(plugin_id)
Expand Down
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/instrument/metric_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def key_paths(path)

# This method take an array of keys and recursively search the metric store structure
# and return a filtered hash of the structure. This method also take into consideration
# getting two different branchs.
# getting two different branches.
#
#
# If one part of the `key_paths` contains a filter key with the following format.
Expand Down
52 changes: 29 additions & 23 deletions x-pack/lib/helpers/elasticsearch_options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,38 +26,44 @@ def es_options_from_settings_or_modules(feature, settings)
# Populate the Elasticsearch options from LogStashSettings file, based on the feature that is being used.
# @return Hash
def es_options_from_settings(feature, settings)
prefix = if feature == "monitoring" &&
LogStash::MonitoringExtension.use_direct_shipping?(settings)
""
else
"xpack."
end
opts = {}

if cloud_id = settings.get("xpack.#{feature}.elasticsearch.cloud_id")
if cloud_id = settings.get("#{prefix}#{feature}.elasticsearch.cloud_id")
opts['cloud_id'] = cloud_id
check_cloud_id_configuration!(feature, settings)
check_cloud_id_configuration!(feature, settings, prefix)
else
opts['hosts'] = settings.get("xpack.#{feature}.elasticsearch.hosts")
opts['hosts'] = settings.get("#{prefix}#{feature}.elasticsearch.hosts")
end
if cloud_auth = settings.get("xpack.#{feature}.elasticsearch.cloud_auth")
if cloud_auth = settings.get("#{prefix}#{feature}.elasticsearch.cloud_auth")
opts['cloud_auth'] = cloud_auth
check_cloud_auth_configuration!(feature, settings)
check_cloud_auth_configuration!(feature, settings, prefix)
else
opts['user'] = settings.get("xpack.#{feature}.elasticsearch.username")
opts['password'] = settings.get("xpack.#{feature}.elasticsearch.password")
opts['user'] = settings.get("#{prefix}#{feature}.elasticsearch.username")
opts['password'] = settings.get("#{prefix}#{feature}.elasticsearch.password")
end
opts['sniffing'] = settings.get("xpack.#{feature}.elasticsearch.sniffing")
opts['ssl_certificate_verification'] = settings.get("xpack.#{feature}.elasticsearch.ssl.verification_mode") == 'certificate'
opts['sniffing'] = settings.get("#{prefix}#{feature}.elasticsearch.sniffing")
opts['ssl_certificate_verification'] = settings.get("#{prefix}#{feature}.elasticsearch.ssl.verification_mode") == 'certificate'

if cacert = settings.get("xpack.#{feature}.elasticsearch.ssl.certificate_authority")
if cacert = settings.get("#{prefix}#{feature}.elasticsearch.ssl.certificate_authority")
opts['cacert'] = cacert
opts['ssl'] = true
end

if truststore = settings.get("xpack.#{feature}.elasticsearch.ssl.truststore.path")
if truststore = settings.get("#{prefix}#{feature}.elasticsearch.ssl.truststore.path")
opts['truststore'] = truststore
opts['truststore_password'] = settings.get("xpack.#{feature}.elasticsearch.ssl.truststore.password")
opts['truststore_password'] = settings.get("#{prefix}#{feature}.elasticsearch.ssl.truststore.password")
opts['ssl'] = true
end

if keystore = settings.get("xpack.#{feature}.elasticsearch.ssl.keystore.path")
if keystore = settings.get("#{prefix}#{feature}.elasticsearch.ssl.keystore.path")
opts['keystore'] = keystore
opts['keystore_password']= settings.get("xpack.#{feature}.elasticsearch.ssl.keystore.password")
opts['keystore_password']= settings.get("#{prefix}#{feature}.elasticsearch.ssl.keystore.password")
opts['ssl'] = true
end
opts
Expand Down Expand Up @@ -135,19 +141,19 @@ def extract_module_settings(settings)

private

def check_cloud_id_configuration!(feature, settings)
return if !settings.set?("xpack.#{feature}.elasticsearch.hosts")
def check_cloud_id_configuration!(feature, settings, prefix)
return if !settings.set?("#{prefix}#{feature}.elasticsearch.hosts")

raise ArgumentError.new("Both \"xpack.#{feature}.elasticsearch.cloud_id\" and " +
"\"xpack.#{feature}.elasticsearch.hosts\" specified, please only use one of those.")
raise ArgumentError.new("Both \"#{prefix}#{feature}.elasticsearch.cloud_id\" and " +
"\"#{prefix}#{feature}.elasticsearch.hosts\" specified, please only use one of those.")
end

def check_cloud_auth_configuration!(feature, settings)
return if !settings.set?("xpack.#{feature}.elasticsearch.username") &&
!settings.set?("xpack.#{feature}.elasticsearch.password")
def check_cloud_auth_configuration!(feature, settings, prefix)
return if !settings.set?("#{prefix}#{feature}.elasticsearch.username") &&
!settings.set?("#{prefix}#{feature}.elasticsearch.password")

raise ArgumentError.new("Both \"xpack.#{feature}.elasticsearch.cloud_auth\" and " +
"\"xpack.#{feature}.elasticsearch.username\"/\"xpack.#{feature}.elasticsearch.password\" " +
raise ArgumentError.new("Both \"#{prefix}#{feature}.elasticsearch.cloud_auth\" and " +
"\"#{prefix}#{feature}.elasticsearch.username\"/\"#{prefix}#{feature}.elasticsearch.password\" " +
"specified, please only use one of those.")
end

Expand Down
47 changes: 44 additions & 3 deletions x-pack/lib/monitoring/inputs/metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Metrics < LogStash::Inputs::Base
def register
@global_stats = fetch_global_stats
@agent = nil
@cluster_uuids = nil
@settings = LogStash::SETTINGS.clone
@last_updated_pipeline_hashes = []
@agent = execution_context.agent if execution_context
Expand Down Expand Up @@ -105,15 +106,28 @@ def stop
end

def update(snapshot)
if LogStash::MonitoringExtension.use_direct_shipping?(LogStash::SETTINGS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens in the case of configuration reloads? Will new cluster_uuids be picked up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case direct shipping is enabled and a user change the ES output configuration in one of the pipelines those new cluster_uuids aren't brought up by the monitoring pipeline because the monitoring pipeline is not restarted.
Actually when the agent converge_state_and_update it reloads the changed pipelines and update_metrics but doesn't reload also the monitoring pipeline, should we scatter a refresh of that pipeline also?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However this behaveior is almost aligned to what it's done for not direct shipping, because in that case the ES to ship data is loaded from logstash.yml and that ES is responsible to enrich the data with cluster_uuid, so that configuration to be changed force the user to restart Logstash

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm ok with this, and we can always follow up if this becomes an issue.

Its slightly different from the non-direct shipping case, as the logstash.yml changes would be a change to the location of the monitoring data, whereas the changes drawn from reloaded plugins would be a change to where that data is shown in the monitoring cluster.

@cluster_uuids ||= extract_cluster_uuids(snapshot.metric_store)
end
update_stats(snapshot)
update_states
end

def update_stats(snapshot)
@logger.debug("Metrics input: received a new snapshot", :created_at => snapshot.created_at, :snapshot => snapshot) if @logger.debug?
if @cluster_uuids.nil? || @cluster_uuids.empty?
fire_stats_event(snapshot, nil)
else
@cluster_uuids.each do |cluster_uuid|
fire_stats_event(snapshot, cluster_uuid)
end
end
end

private
def fire_stats_event(snapshot, cluster_uuid)
begin
event = StatsEventFactory.new(@global_stats, snapshot).make(agent, @extended_performance_collection)
event = StatsEventFactory.new(@global_stats, snapshot, cluster_uuid).make(agent, @extended_performance_collection, @collection_interval)
rescue => e
if @logger.debug?
@logger.error("Failed to create monitoring event", :message => e.message, :error => e.class.name, :backtrace => e.backtrace)
Expand All @@ -132,6 +146,7 @@ def update_stats(snapshot)
emit_event(event)
end

public
def update_states
return unless @agent

Expand All @@ -153,12 +168,19 @@ def update_states
def update_pipeline_state(pipeline)
return if pipeline.system?
if @config_collection
emit_event(state_event_for(pipeline))
events = state_event_for(pipeline)
events.each { |event| emit_event(event) }
end
end

def state_event_for(pipeline)
StateEventFactory.new(pipeline).make()
if @cluster_uuids.nil? || @cluster_uuids.empty?
[StateEventFactory.new(pipeline, nil, @collection_interval).make()]
else
@cluster_uuids.map do |cluster_uuid|
StateEventFactory.new(pipeline, cluster_uuid, @collection_interval).make()
end
end
end

def emit_event(event)
Expand Down Expand Up @@ -187,5 +209,24 @@ def fetch_global_stats
}
}
end

def extract_cluster_uuids(stats)
result = stats.extract_metrics([:stats, :pipelines, :main, :config], :cluster_uuids)
if result && !result[:cluster_uuids].empty?
cluster_uuids = result[:cluster_uuids]
@logger.info("Found cluster_uuids from elasticsearch output plugins", :cluster_uuids => cluster_uuids)
if LogStash::SETTINGS.set?("monitoring.cluster_uuid")
@logger.warn("Found monitoring.cluster_uuid setting configured in logstash.yml while using the ones discovered from elasticsearch output plugins, ignoring setting monitoring.cluster_uuid")
end
cluster_uuids
else
if LogStash::SETTINGS.set?("monitoring.cluster_uuid")
[LogStash::SETTINGS.get("monitoring.cluster_uuid")]
else
@logger.warn("Can't find any cluster_uuid from elasticsearch output plugins nor monitoring.cluster_uuid in logstash.yml is defined")
[""]
end
end
end
end
end; end
28 changes: 21 additions & 7 deletions x-pack/lib/monitoring/inputs/metrics/state_event_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,30 @@
module LogStash; module Inputs; class Metrics;
class StateEventFactory
require "logstash/config/lir_serializer"
def initialize(pipeline)

def initialize(pipeline, cluster_uuid, collection_interval = 10)
raise ArgumentError, "No pipeline passed in!" unless pipeline.is_a?(LogStash::Pipeline) || pipeline.is_a?(LogStash::JavaPipeline)
@event = LogStash::Event.new

@event.set("[@metadata]", {
"document_type" => "logstash_state",
"timestamp" => Time.now
})
pipeline_doc = {"pipeline" => pipeline_data(pipeline)}

if (LogStash::MonitoringExtension.use_direct_shipping?(LogStash::SETTINGS))
event_body = {
"type" => "logstash_state",
"logstash_state" => pipeline_doc,
"cluster_uuid" => cluster_uuid,
"interval_ms" => collection_interval * 1000,
"timestamp" => DateTime.now.strftime('%Y-%m-%dT%k:%M:%S.%L%z')
}
else
event_body = pipeline_doc
end

@event.set("[pipeline]", pipeline_data(pipeline))
@event = LogStash::Event.new(
{"@metadata" => {
"document_type" => "logstash_state",
"timestamp" => Time.now
}}.merge(event_body)
)

@event.remove("@timestamp")
@event.remove("@version")
Expand Down
28 changes: 22 additions & 6 deletions x-pack/lib/monitoring/inputs/metrics/stats_event_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ class StatsEventFactory
include ::LogStash::Util::Loggable
require 'logstash/config/pipelines_info'

def initialize(global_stats, snapshot)
def initialize(global_stats, snapshot, cluster_uuid)
@global_stats = global_stats
@snapshot = snapshot
@metric_store = @snapshot.metric_store
@cluster_uuid = cluster_uuid
end

def make(agent, extended_performance_collection=true)
LogStash::Event.new(
def make(agent, extended_performance_collection=true, collection_interval=10)
metrics_doc = {
"timestamp" => @snapshot.created_at,
"logstash" => fetch_node_stats(agent, @metric_store),
"events" => format_global_event_count(@metric_store),
Expand All @@ -24,10 +25,25 @@ def make(agent, extended_performance_collection=true)
"jvm" => format_jvm_stats(@metric_store),
"os" => format_os_stats(@metric_store),
"queue" => format_queue_stats(agent, @metric_store),
"@metadata" => {
}

if (LogStash::MonitoringExtension.use_direct_shipping?(LogStash::SETTINGS))
event_body = {
"type" => "logstash_stats",
"logstash_stats" => metrics_doc,
"cluster_uuid" => @cluster_uuid,
"interval_ms" => collection_interval * 1000,
"timestamp" => DateTime.now.strftime('%Y-%m-%dT%k:%M:%S.%L%z')
}
else
event_body = metrics_doc
end

LogStash::Event.new(
{"@metadata" => {
"document_type" => "logstash_stats",
"timestamp" => Time.now
}
}}.merge(event_body)
)
end

Expand All @@ -48,7 +64,7 @@ def format_jvm_stats(stats)
result["mem"] = {
"heap_used_in_bytes" => heap_stats[:used_in_bytes],
"heap_used_percent" => heap_stats[:used_percent],
"heap_max_in_bytes" => heap_stats[:max_in_bytes],
"heap_max_in_bytes" => heap_stats[:max_in_bytes],
}

result["gc"] = {
Expand Down
Loading