Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ECS compatibility #165

Merged
merged 5 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 6.2.0
- Added ECS Compatibility Mode [#165](https://github.com/logstash-plugins/logstash-input-tcp/pull/165)
- When operating in an ECS Compatibility mode, metadata about the connection on which we are receiving data is nested in well-named fields under `[@metadata][input][tcp]` instead of at the root level.
- Fix: source address is no longer missing when a proxy is present

## 6.1.1
- Changed jar dependencies to reflect newer versions [#179](https://github.com/logstash-plugins/logstash-input-http/pull/179)

Expand Down
62 changes: 62 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,53 @@ event timestamp
}
}

[id="plugins-{type}s-{plugin}-ecs_metadata"]
==== Event Metadata and the Elastic Common Schema (ECS)
yaauie marked this conversation as resolved.
Show resolved Hide resolved

In addition to decoding the events, this input will add metadata about the TCP connection itself to each event.
This can be helpful when applications are configured to send events directly to this input's TCP listener without including information about themselves.

Historically, this metadata was added to a variety of non-standard top-level fields, which had the potential to create confusion and schema conflicts downstream.
With ECS compatibility mode, we can ensure a pipeline still has access to this metadata throughout the event's lifecycle without polluting the top-level namespace.

.Source Metadata Location by `ecs_compatibility` value
yaauie marked this conversation as resolved.
Show resolved Hide resolved
[cols="3,7,5"]
|=======================================================================
| Metadata Group | ecs: `v1`, `v8` | ecs: `disabled`

.3+|Source Metadata from the TCP connection
on which events are being received, including
the sender's name, ip, and outbound port. l|[@metadata][input][tcp][source][name] l|[host]
l|[@metadata][input][tcp][source][ip] l|[@metadata][ip_address]
l|[@metadata][input][tcp][source][port] l|[port]

.2+|Proxy Metadata from a proxied TCP connection.
Available when receiving events by proxy and
`proxy_protocol => true` l|[@metadata][input][tcp][proxy][ip] l|[proxy_host]
l|[@metadata][input][tcp][proxy][port] l|[proxy_port]

.1+|SSL Subject Metadata from a secured TCP
connection. Available when `ssl_enable => true`
AND `ssl_verify => true` l|[@metadata][input][tcp][ssl][subject] l|[sslsubject]
|=======================================================================

For example, the Elastic Common Schema reserves the https://www.elastic.co/guide/en/ecs/current/ecs-host.html[top-level `host` field] for information about the host on which the event happened.
If an event is missing this metadata, it can be copied into place from the source TCP connection metadata that has been added to the event:

[source,txt]
-----
filter {
if [@metadata][input][tcp][source] and not [host] {
mutate {
copy {
"[@metadata][input][tcp][source][name]" => "[host][name]"
"[@metadata][input][tcp][source][ip]" => "[host][ip]"
}
}
}
}
-----

[id="plugins-{type}s-{plugin}-options"]
==== Tcp Input Configuration Options

Expand All @@ -79,6 +126,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
|=======================================================================
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-dns_reverse_lookup_enabled>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-ecs_compatibility>> | <<string,string>>|No
| <<plugins-{type}s-{plugin}-host>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-mode>> |<<string,string>>, one of `["server", "client"]`|No
| <<plugins-{type}s-{plugin}-port>> |<<number,number>>|Yes
Expand Down Expand Up @@ -108,6 +156,20 @@ It is possible to avoid DNS reverse-lookups by disabling this setting. If disabl
the address metadata that is added to events will contain the source address as-specified
at the TCP layer and IPs will not be resolved to hostnames.

[id="plugins-{type}s-{plugin}-ecs_compatibility"]
===== `ecs_compatibility`

* Value type is <<string,string>>
* Supported values are:
** `disabled`: unstructured connection metadata added at root level
** `v1`,`v8`: structured connection metadata added under `[@metadata][input][tcp]`
* Default value depends on which version of Logstash is running:
** When Logstash provides a `pipeline.ecs_compatibility` setting, its value is used as the default
** Otherwise, the default value is `disabled`.

Controls this plugin's compatibility with the https://www.elastic.co/guide/en/ecs/current/index.html[Elastic Common Schema (ECS)].
The value of this setting affects the <<plugins-{type}s-{plugin}-ecs_metadata,placement of a TCP connection's metadata>> on events.

[id="plugins-{type}s-{plugin}-host"]
===== `host`

Expand Down
38 changes: 24 additions & 14 deletions lib/logstash/inputs/tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
require "logstash/inputs/base"
require "logstash/util/socket_peer"
require "logstash-input-tcp_jars"
require "logstash/inputs/tcp/decoder_impl"
require 'logstash/plugin_mixins/ecs_compatibility_support'

require "socket"
require "openssl"
Expand Down Expand Up @@ -63,6 +63,11 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
java_import 'org.logstash.tcp.InputLoop'
java_import 'org.logstash.tcp.SslContextBuilder'

require_relative "tcp/decoder_impl"

# ecs_compatibility option, provided by Logstash core or the support adapter.
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)

config_name "tcp"

default :codec, "line"
Expand Down Expand Up @@ -113,13 +118,6 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
# Option to allow users to avoid DNS Reverse Lookup.
config :dns_reverse_lookup_enabled, :validate => :boolean, :default => true

HOST_FIELD = "host".freeze
HOST_IP_FIELD = "[@metadata][ip_address]".freeze
PORT_FIELD = "port".freeze
PROXY_HOST_FIELD = "proxy_host".freeze
PROXY_PORT_FIELD = "proxy_port".freeze
SSLSUBJECT_FIELD = "sslsubject".freeze

# Monkey patch TCPSocket and SSLSocket to include socket peer
# @private
def self.patch_socket_peer!
Expand All @@ -134,6 +132,8 @@ def self.patch_socket_peer!
def initialize(*args)
super(*args)

setup_fields!

self.class.patch_socket_peer!

# threadsafe socket bookkeeping
Expand Down Expand Up @@ -186,8 +186,8 @@ def decode_buffer(client_ip_address, client_address, client_port, codec, proxy_a
proxy_port, tbuf, socket)
codec.decode(tbuf) do |event|
if @proxy_protocol
event.set(PROXY_HOST_FIELD, proxy_address) unless event.get(PROXY_HOST_FIELD)
event.set(PROXY_PORT_FIELD, proxy_port) unless event.get(PROXY_PORT_FIELD)
event.set(@field_proxy_host, proxy_address) unless event.get(@field_proxy_host)
event.set(@field_proxy_port, proxy_port) unless event.get(@field_proxy_port)
end
enqueue_decorated(event, client_ip_address, client_address, client_port, socket)
end
Expand Down Expand Up @@ -260,14 +260,24 @@ def handle_socket(socket)
end

def enqueue_decorated(event, client_ip_address, client_address, client_port, socket)
event.set(HOST_FIELD, client_address) unless event.get(HOST_FIELD)
event.set(HOST_IP_FIELD, client_ip_address) unless event.get(HOST_IP_FIELD)
event.set(PORT_FIELD, client_port) unless event.get(PORT_FIELD)
event.set(SSLSUBJECT_FIELD, socket.peer_cert.subject.to_s) if socket && @ssl_enable && @ssl_verify && event.get(SSLSUBJECT_FIELD).nil?
event.set(@field_host, client_address) unless event.get(@field_host)
event.set(@field_host_ip, client_ip_address) unless event.get(@field_host_ip)
event.set(@field_port, client_port) unless event.get(@field_port)
event.set(@field_sslsubject, socket.peer_cert.subject.to_s) if socket && @ssl_enable && @ssl_verify && event.get(@field_sslsubject).nil?
decorate(event)
@output_queue << event
end

# setup the field names, with respect to ECS compatibility.
def setup_fields!
@field_host = ecs_select[disabled: "host", v1: "[@metadata][input][tcp][source][name]" ].freeze
@field_host_ip = ecs_select[disabled: "[@metadata][ip_address]", v1: "[@metadata][input][tcp][source][ip]" ].freeze
@field_port = ecs_select[disabled: "port", v1: "[@metadata][input][tcp][source][port]" ].freeze
@field_proxy_host = ecs_select[disabled: "proxy_host", v1: "[@metadata][input][tcp][proxy][ip]" ].freeze
@field_proxy_port = ecs_select[disabled: "proxy_port", v1: "[@metadata][input][tcp][proxy][port]" ].freeze
@field_sslsubject = ecs_select[disabled: "sslsubject", v1: "[@metadata][input][tcp][tls][client][subject]"].freeze
end

def server?
@mode == "server"
end
Expand Down
21 changes: 12 additions & 9 deletions lib/logstash/inputs/tcp/decoder_impl.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# encoding: utf-8
require 'java'

class DecoderImpl
class LogStash::Inputs::Tcp::DecoderImpl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💛


include org.logstash.tcp.Decoder

Expand All @@ -24,7 +24,7 @@ def decode(channel_addr, data)
end

def copy
DecoderImpl.new(@codec.clone, @tcp)
self.class.new(@codec.clone, @tcp)
end

def flush
Expand All @@ -41,23 +41,26 @@ def init_first_read(channel_addr, received)
@tcp.logger.error("Invalid proxy protocol header label", :header => pp_hdr)
raise IOError.new("Invalid proxy protocol header label #{pp_hdr.inspect}")
else
@proxy_address = pp_info[3]
@proxy_port = pp_info[5]
@address = pp_info[2]
@port = pp_info[4]
@proxy_address = pp_info[3] # layer 3 destination address (proxy's receiving address)
@proxy_port = pp_info[5] # TCP destination port (proxy's receiving port)
@ip_address = pp_info[2] # layer 3 source address (outgoing ip of sender)
@address = extract_host_name(@ip_address)
@port = pp_info[4] # TCP source port (outgoing port on sender [probably random])
end
else
filtered = received
@ip_address = channel_addr.get_address.get_host_address
@address = extract_host_name(channel_addr)
@port = channel_addr.get_port
@ip_address = channel_addr.get_address.get_host_address # ip address of sender
@address = extract_host_name(channel_addr) # name _or_ address of sender
@port = channel_addr.get_port # outgoing port of sender (probably random)
end
@first_read = false
filtered
end

private
def extract_host_name(channel_addr)
channel_addr = java.net.InetSocketAddress.new(channel_addr, 0) if channel_addr.kind_of?(String)

return channel_addr.get_host_string unless @tcp.dns_reverse_lookup_enabled?

channel_addr.get_host_name
Expand Down
1 change: 1 addition & 0 deletions logstash-input-tcp.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Gem::Specification.new do |s|

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~>1.2'

s.add_runtime_dependency 'logstash-core', '>= 6.7.0'

Expand Down
Loading