Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: added target configuration + event-factory support #31

Merged
merged 11 commits into from
Aug 4, 2021
21 changes: 21 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Be sure to replace `10.0.0.1` with the IP of your Logstash instance.
| <<plugins-{type}s-{plugin}-nan_value>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-prune_intervals>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-security_level>> |<<string,string>>, one of `["None", "Sign", "Encrypt"]`|No
| <<plugins-{type}s-{plugin}-target>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-typesdb>> |<<array,array>>|No
|=======================================================================

Expand Down Expand Up @@ -127,6 +128,26 @@ Prune interval records. Defaults to `true`.
Security Level. Default is `None`. This setting mirrors the setting from the
collectd https://collectd.org/wiki/index.php/Plugin:Network[Network plugin]

[id="plugins-{type}s-{plugin}-target"]
===== `target`

* Value type is <<string,string>>
* There is no default value for this setting.

Define the target field for placing the decoded values. If this setting is not
set, data will be stored at the root (top level) of the event.

For example, if you want data to be put under the `document` field:
[source,ruby]
input {
udp {
port => 12345
codec => collectd {
target => "[document]"
}
}
}

[id="plugins-{type}s-{plugin}-typesdb"]
===== `typesdb`

Expand Down
45 changes: 29 additions & 16 deletions lib/logstash/codecs/collectd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,8 @@
require "tempfile"
require "time"

import "javax.crypto.Mac"

class ProtocolError < LogStash::Error; end
class HeaderError < LogStash::Error; end
class EncryptionError < LogStash::Error; end
class NaNError < LogStash::Error; end
require 'logstash/plugin_mixins/event_support/event_factory_adapter'
require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'

# Read events from the collectd binary protocol over the network via udp.
# See https://collectd.org/wiki/index.php/Binary_protocol
Expand All @@ -38,15 +34,24 @@ class NaNError < LogStash::Error; end
# IgnoreSelected false
# </Plugin>
# <Plugin network>
# <Server "10.0.0.1" "25826">
# </Server>
# Server "10.0.0.1" "25826"
# </Plugin>
#
# Be sure to replace `10.0.0.1` with the IP of your Logstash instance.
#
class LogStash::Codecs::Collectd < LogStash::Codecs::Base

extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter

include LogStash::PluginMixins::EventSupport::EventFactoryAdapter

config_name "collectd"

class ProtocolError < LogStash::Error; end
class HeaderError < LogStash::Error; end
class EncryptionError < LogStash::Error; end
class NaNError < LogStash::Error; end

@@openssl_mutex = Mutex.new

AUTHFILEREGEX = /([^:]+): (.+)/
Expand Down Expand Up @@ -108,8 +113,7 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base

# Security Level. Default is `None`. This setting mirrors the setting from the
# collectd https://collectd.org/wiki/index.php/Plugin:Network[Network plugin]
config :security_level, :validate => [SECURITY_NONE, SECURITY_SIGN, SECURITY_ENCR],
:default => "None"
config :security_level, :validate => [SECURITY_NONE, SECURITY_SIGN, SECURITY_ENCR], :default => "None"

# What to do when a value in the event is `NaN` (Not a Number)
#
Expand All @@ -132,6 +136,12 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
# `Sign` or `Encrypt`
config :authfile, :validate => :string

# Defines a target field for placing decoded fields.
# If this setting is omitted, data gets stored at the root (top level) of the event.
#
# NOTE: the target is only relevant while decoding data into a new event.
config :target, :validate => :field_reference

public
def register
@logger.trace("Starting Collectd codec...")
Expand Down Expand Up @@ -467,14 +477,10 @@ def decode(payload)
# This is better than looping over all keys every time.
collectd.delete('type_instance') if collectd['type_instance'] == ""
collectd.delete('plugin_instance') if collectd['plugin_instance'] == ""
if add_nan_tag
collectd['tags'] ||= []
collectd['tags'] << @nan_tag
end
# This ugly little shallow-copy hack keeps the new event from getting munged by the cleanup
# With pass-by-reference we get hosed (if we pass collectd, then clean it up rapidly, values can disappear)
if !drop # Drop the event if it's flagged true
yield LogStash::Event.new(collectd.dup)
yield generate_event(collectd.dup, add_nan_tag)
else
raise(NaNError)
end
Expand All @@ -485,8 +491,15 @@ def decode(payload)
end
end
end # while payload.length > 0 do
rescue EncryptionError, ProtocolError, HeaderError, NaNError
rescue EncryptionError, ProtocolError, HeaderError, NaNError => e
# basically do nothing, we just want out
@logger.debug("Decode failure", payload: payload, message: e.message)
end # def decode

def generate_event(payload, add_nan_tag)
event = targeted_event_factory.new_event(payload)
event.tag @nan_tag if add_nan_tag
event
end

end # class LogStash::Codecs::Collectd
4 changes: 3 additions & 1 deletion logstash-codec-collectd.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-codec-collectd'
s.version = '3.0.8'
s.version = '3.1.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Reads events from the `collectd` binary protocol using UDP."
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -20,6 +20,8 @@ Gem::Specification.new do |s|

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-mixin-event_support', '~> 1.0'
s.add_runtime_dependency 'logstash-mixin-validator_support', '~> 1.0'

s.add_development_dependency 'logstash-devutils'
s.add_development_dependency 'insist'
Expand Down
30 changes: 30 additions & 0 deletions spec/codecs/collectd_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,36 @@
end
expect(counter).to eq(1)
end # it "should replace a NaN with a zero and add tag '_collectdNaN' by default"

context 'with target' do

subject do
LogStash::Codecs::Collectd.new("target" => "[foo]")
end

it "decodes with data into target field" do
payload = ["00000015746573742e6578616d706c652e636f6d000008000c14dc4c81831ef78b0009000c00000000400000000002000970696e67000004000970696e67000005001c70696e672d7461726765742e6578616d706c652e636f6d000006000f000101000000000000f87f"].pack('H*')
counter = 0
subject.decode(payload) do |event|
case counter
when 0
expect(event.include?("host")).to be false
expect(event.include?("plugin")).to be false

expect(event.get("[foo][host]")).to eq("test.example.com")
expect(event.get("[foo][plugin]")).to eq("ping")
expect(event.get("[foo][type_instance]")).to eq("ping-target.example.com")
expect(event.get("[foo][collectd_type]")).to eq("ping")
expect(event.get("[foo][value]")).to eq(0)
expect(event.get("tags")).to eq(["_collectdNaN"])
end
counter += 1
end
expect(counter).to eq(1)
end

end

end # context "None"

context "Replace nan_value and nan_tag with non-default values" do
Expand Down