Skip to content

Commit

Permalink
Feat: ECS compatibility support (#63)
Browse files Browse the repository at this point in the history
All event fields produces by the input are expected to be aligned with ECS.

resolves #60
  • Loading branch information
kares authored Mar 22, 2021
1 parent bb02c01 commit 3de814d
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 132 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.5.0
- Feat: ECS compatibility support [#63](https://github.com/logstash-plugins/logstash-input-syslog/pull/63)

## 3.4.5
- Added support for listening on IPv6 addresses

Expand Down
18 changes: 18 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
[cols="<,<,<",options="header",]
|=======================================================================
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-ecs_compatibility>> | <<string,string>>|No
| <<plugins-{type}s-{plugin}-facility_labels>> |<<array,array>>|No
| <<plugins-{type}s-{plugin}-grok_pattern>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-host>> |<<string,string>>|No
Expand All @@ -64,6 +65,20 @@ input plugins.

&nbsp;

[id="plugins-{type}s-{plugin}-ecs_compatibility"]
===== `ecs_compatibility`

* Value type is <<string,string>>
* Supported values are:
** `disabled`: does not use ECS-compatible field names (for example, `priority` for syslog priority)
** `v1`: uses fields that are compatible with Elastic Common Schema (for example, `[log][syslog][priority]`)
* Default value depends on which version of Logstash is running:
** When Logstash provides a `pipeline.ecs_compatibility` setting, its value is used as the default
** Otherwise, the default value is `disabled`.

Controls this plugin's compatibility with the
{ecs-ref}[Elastic Common Schema (ECS)].

[id="plugins-{type}s-{plugin}-facility_labels"]
===== `facility_labels`

Expand All @@ -84,6 +99,9 @@ the facility_label is not added to the event.

* Value type is <<string,string>>
* Default value is `"<%{POSINT:priority}>%{SYSLOGLINE}"`
* Default value depends on whether <<plugins-{type}s-{plugin}-ecs_compatibility>> is enabled:
** ECS Compatibility disabled: `"<%{POSINT:priority}>%{SYSLOGLINE}"`
** ECS Compatibility enabled: `"<%{POSINT:[log][syslog][priority]:int}>%{SYSLOGLINE}"`

The default value should read and properly parse syslog lines which are
fully compliant with http://www.ietf.org/rfc/rfc3164.txt[RFC3164].
Expand Down
167 changes: 111 additions & 56 deletions lib/logstash/inputs/syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "logstash/filters/date"
require "logstash/inputs/base"
require "logstash/namespace"
require 'logstash/plugin_mixins/ecs_compatibility_support'
require "stud/interval"

# Read syslog messages as events over the network.
Expand All @@ -25,6 +26,8 @@
# Note: This input will start listeners on both TCP and UDP.
#
class LogStash::Inputs::Syslog < LogStash::Inputs::Base
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1)

config_name "syslog"

default :codec, "plain"
Expand All @@ -42,7 +45,7 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base

# Set custom grok pattern to parse the syslog, in case the format differs
# from the defined standard. This is common in security and other appliances
config :grok_pattern, :validate => :string, :default => "<%{POSINT:priority}>%{SYSLOGLINE}"
config :grok_pattern, :validate => :string

# Proxy protocol support, only v1 is supported at this time
# http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt
Expand Down Expand Up @@ -74,30 +77,76 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
#
config :locale, :validate => :string

public
def register
@metric_errors = metric.namespace(:errors)
# ECS only option to configure [service][type] value in produced events.
#
# NOTE: for now, purposefully un-documented as there are other [service] fields we could support,
# assuming users would want that (they have specific use-case for LS as syslog server).
config :service_type, :validate => :string, :default => 'system'

def initialize(*params)
super

@priority_key = ecs_select[disabled:'priority', v1:'[log][syslog][priority]']
@facility_key = ecs_select[disabled:'facility', v1:'[log][syslog][facility][code]']
@severity_key = ecs_select[disabled:'severity', v1:'[log][syslog][severity][code]']

@facility_label_key = ecs_select[disabled:'facility_label', v1:'[log][syslog][facility][name]']
@severity_label_key = ecs_select[disabled:'severity_label', v1:'[log][syslog][severity][name]']

@host_key = ecs_select[disabled:'host', v1:'[host][ip]']

@grok_pattern ||= ecs_select[
disabled:"<%{POSINT:#{@priority_key}}>%{SYSLOGLINE}",
v1:"<%{POSINT:#{@priority_key}:int}>%{SYSLOGLINE}"
]

@grok_filter = LogStash::Filters::Grok.new(
"overwrite" => @syslog_field,
"match" => { @syslog_field => @grok_pattern },
"tag_on_failure" => ["_grokparsefailure_sysloginput"],
"overwrite" => @syslog_field,
"match" => { @syslog_field => @grok_pattern },
"tag_on_failure" => ["_grokparsefailure_sysloginput"],
"ecs_compatibility" => ecs_compatibility # use ecs-compliant patterns
)

@grok_filter_exec = ecs_select[
disabled: -> (event) { @grok_filter.filter(event) },
v1: -> (event) {
event.set('[event][original]', event.get(@syslog_field))
@grok_filter.filter(event)
set_service_fields(event)
}
]

@date_filter = LogStash::Filters::Date.new(
"match" => [ "timestamp", "MMM dd HH:mm:ss", "MMM d HH:mm:ss", "ISO8601"],
"locale" => @locale,
"timezone" => @timezone,
"match" => [ "timestamp", "MMM dd HH:mm:ss", "MMM d HH:mm:ss", "MMM d HH:mm:ss", "ISO8601"],
"locale" => @locale,
"timezone" => @timezone,
)

@date_filter_exec = ecs_select[
disabled: -> (event) {
# in legacy (non-ecs) mode we used to match (SYSLOGBASE2) timestamp into two fields
event.set("timestamp", event.get("timestamp8601")) if event.include?("timestamp8601")
@date_filter.filter(event)
},
v1: -> (event) {
@date_filter.filter(event)
event.remove('timestamp')
}
]
end

def register
@metric_errors = metric.namespace(:errors)

@grok_filter.register
@date_filter.register

@tcp_sockets = Concurrent::Array.new
@tcp = @udp = nil
end # def register

public
private

def run(output_queue)
udp_thr = Thread.new(output_queue) do |output_queue|
server(:udp, output_queue)
Expand All @@ -112,8 +161,8 @@ def run(output_queue)
udp_thr.join
tcp_thr.join
end # def run
public :run

private
# server call the specified protocol listener and basically restarts on
# any listener uncatched exception
#
Expand All @@ -130,7 +179,6 @@ def server(protocol, output_queue)
end
end

private
# udp_listener creates the udp socket and continously read from it.
# upon exception the socket will be closed and the exception bubbled
# in the server which will restart the listener
Expand All @@ -151,7 +199,6 @@ def udp_listener(output_queue)
close_udp
end # def udp_listener

private
# tcp_listener accepts tcp connections and creates a new tcp_receiver thread
# for each accepted socket.
# upon exception all tcp sockets will be closed and the exception bubbled
Expand All @@ -177,22 +224,25 @@ def tcp_listener(output_queue)
# tcp_receiver is executed in a thread, any uncatched exception will be bubbled up to the
# tcp server thread and all tcp connections will be closed and the listener restarted.
def tcp_receiver(output_queue, socket)
ip, port = socket.peeraddr[3], socket.peeraddr[1]
first_read = true
peer_addr = socket.peeraddr
ip, port = peer_addr[3], peer_addr[1]

@logger.info("new connection", :client => "#{ip}:#{port}")
LogStash::Util::set_thread_name("input|syslog|tcp|#{ip}:#{port}}")

first_read = true

socket.each do |line|
metric.increment(:messages_received)
if @proxy_protocol && first_read
first_read = false
pp_info = line.split(/\s/)
# PROXY proto clientip proxyip clientport proxyport
if pp_info[0] != "PROXY"
@logger.error("invalid proxy protocol header label", :hdr => line)
@logger.error("invalid proxy protocol header label", header: line)
raise IOError
else
# would be nice to log the proxy host and port data as well, but minimizing changes
@logger.debug("proxy protocol detected", header: line)
ip = pp_info[2]
port = pp_info[3]
next
Expand All @@ -206,20 +256,19 @@ def tcp_receiver(output_queue, socket)
rescue Errno::EBADF
# swallow connection closed exceptions to avoid bubling up the tcp_listener & server
logger.info("connection closed", :client => "#{ip}:#{port}")
rescue IOError => ioerror
rescue IOError => e
# swallow connection closed exceptions to avoid bubling up the tcp_listener & server
raise unless socket.closed? && ioerror.message.include?("closed")
logger.info("connection error: #{ioerror.message}")
raise(e) unless socket.closed? && e.message.to_s.include?("closed")
logger.info("connection error:", :exception => e.class, :message => e.message)
ensure
@tcp_sockets.delete(socket)
socket.close rescue log_and_squash(:close_tcp_receiver_socket)
end

private
def decode(host, output_queue, data)
def decode(ip, output_queue, data)
@codec.decode(data) do |event|
decorate(event)
event.set("host", host)
event.set(@host_key, ip)
syslog_relay(event)
output_queue << event
metric.increment(:events)
Expand All @@ -230,13 +279,13 @@ def decode(host, output_queue, data)
@metric_errors.increment(:decoding)
end

public
# @see LogStash::Plugin#close
def stop
close_udp
close_tcp
end
public :stop

private
def close_udp
if @udp
@udp.close_read rescue log_and_squash(:close_udp_read)
Expand All @@ -245,8 +294,6 @@ def close_udp
@udp = nil
end

private

# Helper for inline rescues, which logs the exception at "DEBUG" level and returns nil.
#
# Instead of:
Expand Down Expand Up @@ -276,46 +323,54 @@ def close_tcp
# If the message cannot be recognized (see @grok_filter), we'll
# treat it like the whole event["message"] is correct and try to fill
# the missing pieces (host, priority, etc)
public
def syslog_relay(event)
@grok_filter.filter(event)
@grok_filter_exec.(event)

if event.get("tags").nil? || !event.get("tags").include?(@grok_filter.tag_on_failure)
# Per RFC3164, priority = (facility * 8) + severity
# = (facility << 3) & (severity)
priority = event.get("priority").to_i rescue 13
severity = priority & 7 # 7 is 111 (3 bits)
facility = priority >> 3
event.set("priority", priority)
event.set("severity", severity)
event.set("facility", facility)

event.set("timestamp", event.get("timestamp8601")) if event.include?("timestamp8601")
@date_filter.filter(event)
priority = event.get(@priority_key).to_i rescue 13
set_priority event, priority

@date_filter_exec.(event)

else
@logger.debug? && @logger.debug("NOT SYSLOG", :message => event.get("message"))
@logger.debug? && @logger.debug("un-matched syslog message", :message => event.get("message"))

# RFC3164 says unknown messages get pri=13
priority = 13
event.set("priority", 13)
event.set("severity", 5) # 13 & 7 == 5
event.set("facility", 1) # 13 >> 3 == 1
set_priority event, 13
metric.increment(:unknown_messages)
end

# Apply severity and facility metadata if
# use_labels => true
if @use_labels
facility_number = event.get("facility")
severity_number = event.get("severity")
# Apply severity and facility metadata if use_labels => true
set_labels(event) if @use_labels
end # def syslog_relay
public :syslog_relay

def set_priority(event, priority)
severity = priority & 7 # 7 is 111 (3 bits)
facility = priority >> 3
event.set(@priority_key, priority)
event.set(@severity_key, severity)
event.set(@facility_key, facility)
end

if @facility_labels[facility_number]
event.set("facility_label", @facility_labels[facility_number])
end
def set_labels(event)
facility_number = event.get(@facility_key)
severity_number = event.get(@severity_key)

if @severity_labels[severity_number]
event.set("severity_label", @severity_labels[severity_number])
end
facility_label = @facility_labels[facility_number]
event.set(@facility_label_key, facility_label) if facility_label

severity_label = @severity_labels[severity_number]
event.set(@severity_label_key, severity_label) if severity_label
end

def set_service_fields(event)
service_type = @service_type
if service_type && !service_type.empty?
event.set('[service][type]', service_type) unless event.include?('[service][type]')
end
end # def syslog_relay
end

end # class LogStash::Inputs::Syslog
5 changes: 3 additions & 2 deletions logstash-input-syslog.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-syslog'
s.version = '3.4.5'
s.version = '3.5.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Reads syslog messages as events"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -21,12 +21,13 @@ Gem::Specification.new do |s|

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~> 1.1'

s.add_runtime_dependency 'concurrent-ruby'
s.add_runtime_dependency 'stud', '>= 0.0.22', '< 0.1.0'

s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'logstash-filter-grok'
s.add_runtime_dependency 'logstash-filter-grok', '>= 4.4.0'
s.add_runtime_dependency 'logstash-filter-date'

s.add_development_dependency 'logstash-devutils'
Expand Down
Loading

0 comments on commit 3de814d

Please sign in to comment.