Skip to content

Commit

Permalink
add ecs support (#179)
Browse files Browse the repository at this point in the history
Add ECS support
* skip country_code3
* auto set `target` from `source`

Fixed: #163

Co-authored-by: Ry Biesemeyer <ry.biesemeyer@elastic.co>
  • Loading branch information
kaisecheng and yaauie authored Mar 25, 2021
1 parent 118df48 commit f9de15b
Show file tree
Hide file tree
Showing 9 changed files with 474 additions and 143 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 7.1.0
- Add ECS compatibility [#179](https://github.com/logstash-plugins/logstash-filter-geoip/pull/179)

## 7.0.1
- [DOC] Add documentation for MaxMind database license change [#177](https://github.com/logstash-plugins/logstash-filter-geoip/pull/177)

Expand Down
24 changes: 23 additions & 1 deletion docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-cache_size>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-database>> |a valid filesystem path|No
| <<plugins-{type}s-{plugin}-default_database_type>> |`City` or `ASN`|No
| <<plugins-{type}s-{plugin}-ecs_compatibility>> | <<string,string>>|No
| <<plugins-{type}s-{plugin}-fields>> |<<array,array>>|No
| <<plugins-{type}s-{plugin}-source>> |<<string,string>>|Yes
| <<plugins-{type}s-{plugin}-tag_on_failure>> |<<array,array>>|No
Expand Down Expand Up @@ -164,6 +165,20 @@ For the built-in GeoLite2 City database, the following are available:
`dma_code`, `ip`, `latitude`, `location`, `longitude`, `postal_code`, `region_code`,
`region_name` and `timezone`.

[id="plugins-{type}s-{plugin}-ecs_compatibility"]
===== `ecs_compatibility`

* Value type is <<string,string>>
* Supported values are:
** `disabled`: unstructured geo data added at root level
** `v1`: uses fields that are compatible with Elastic Common Schema (for example, `[client][geo][country_name]`)
* Default value depends on which version of Logstash is running:
** When Logstash provides a `pipeline.ecs_compatibility` setting, its value is used as the default
** Otherwise, the default value is `disabled`.

Controls this plugin's compatibility with the {ecs-ref}[Elastic Common Schema (ECS)].
The value of this setting affects the _default_ value of <<plugins-{type}s-{plugin}-target>>.

[id="plugins-{type}s-{plugin}-source"]
===== `source`

Expand All @@ -185,8 +200,15 @@ Tags the event on failure to look up geo information. This can be used in later
[id="plugins-{type}s-{plugin}-target"]
===== `target`

* This is an optional setting with condition.
* Value type is <<string,string>>
* Default value is `"geoip"`
* Default value depends on whether <<plugins-{type}s-{plugin}-ecs_compatibility>> is enabled:
** ECS Compatibility disabled: `geoip`
** ECS Compatibility enabled: If `source` is an `ip` sub-field, eg. `[client][ip]`,
`target` will automatically set to the parent field, in this example `client`,
otherwise, `target` is a required setting
*** `geo` field is nested in `[client][geo]`
*** ECS compatible values are `client`, `destination`, `host`, `observer`, `server`, `source`

Specify the field into which Logstash should store the geoip data.
This can be useful, for example, if you have `src_ip` and `dst_ip` fields and
Expand Down
55 changes: 45 additions & 10 deletions lib/logstash/filters/geoip.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require "logstash/filters/base"
require "logstash/namespace"
require "logstash-filter-geoip_jars"
require "logstash/plugin_mixins/ecs_compatibility_support"


# The GeoIP filter adds information about the geographical location of IP addresses,
Expand Down Expand Up @@ -31,6 +32,8 @@
# --

class LogStash::Filters::GeoIP < LogStash::Filters::Base
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1)

config_name "geoip"

# The path to the GeoLite2 database file which Logstash should use. City and ASN databases are supported.
Expand Down Expand Up @@ -60,14 +63,16 @@ class LogStash::Filters::GeoIP < LogStash::Filters::Base
# This can be useful, for example, if you have `src_ip` and `dst_ip` fields and
# would like the GeoIP information of both IPs.
#
# If you save the data to a target field other than `geoip` and want to use the
# `geo_point` related functions in Elasticsearch, you need to alter the template
# provided with the Elasticsearch output and configure the output to use the
# new template.
# ECS disabled/ Legacy default: `geoip`
# ECS default: The `target` is auto-generated from `source` when the `source` specifies an `ip` sub-field
# For example, source => [client][ip], `target` will be `client`
# If `source` is not an `ip` sub-field, source => client_ip, `target` setting is mandatory
#
# Elasticsearch ECS mode expected `geo` fields to be nested at:
# `client`, `destination`, `host`, `observer`, `server`, `source`
#
# Even if you don't use the `geo_point` mapping, the `[target][location]` field
# is still valid GeoJSON.
config :target, :validate => :string, :default => 'geoip'
# `geo` fields are not expected to be used directly at the root of the events
config :target, :validate => :string

# GeoIP lookup is surprisingly expensive. This filter uses an cache to take advantage of the fact that
# IPs agents are often found adjacent to one another in log files and rarely have a random distribution.
Expand All @@ -89,7 +94,18 @@ class LogStash::Filters::GeoIP < LogStash::Filters::Base
config :tag_on_failure, :validate => :array, :default => ["_geoip_lookup_failure"]

public

ECS_TARGET_FIELD = %w{
client
destination
host
observer
server
source
}.map(&:freeze).freeze

def register
setup_target_field
setup_filter(select_database_path)
end

Expand All @@ -108,10 +124,29 @@ def tag_unsuccessful_lookup(event)
@tag_on_failure.each{|tag| event.tag(tag)}
end

def setup_target_field
if ecs_compatibility == :disabled
@target ||= 'geoip'
else
@target ||= auto_target_from_source!
# normalize top-level fields to not be bracket-wrapped
normalized_target = @target.gsub(/\A\[([^\[\]]+)\]\z/,'\1')
logger.warn("ECS expect `target` value `#{normalized_target}` in #{ECS_TARGET_FIELD}") unless ECS_TARGET_FIELD.include?(normalized_target)
end
end

def auto_target_from_source!
return @source[0...-4] if @source.end_with?('[ip]') && @source.length > 4

fail(LogStash::ConfigurationError, "GeoIP Filter in ECS-Compatiblity mode "\
"requires a `target` when `source` is not an `ip` sub-field, eg. [client][ip]")
end


def setup_filter(database_path)
@database = database_path
@logger.info("Using geoip database", :path => @database)
@geoipfilter = org.logstash.filters.GeoIPFilter.new(@source, @target, @fields, @database, @cache_size)
@geoipfilter = org.logstash.filters.geoip.GeoIPFilter.new(@source, @target, @fields, @database, @cache_size, ecs_compatibility.to_s)
end

def terminate_filter
Expand All @@ -125,7 +160,7 @@ def close
end

def select_database_path
vendor_path = ::File.expand_path("../../../vendor/", ::File.dirname(__FILE__))
vendor_path = ::File.expand_path(::File.join("..", "..", "..", "..", "vendor"), __FILE__)

if load_database_manager?
@database_manager = LogStash::Filters::Geoip::DatabaseManager.new(self, @database, @default_database_type, vendor_path)
Expand All @@ -137,7 +172,7 @@ def select_database_path

def load_database_manager?
begin
require_relative "#{LogStash::Environment::LOGSTASH_HOME}/x-pack/lib/filters/geoip/database_manager"
require_relative ::File.join(LogStash::Environment::LOGSTASH_HOME, "x-pack", "lib", "filters", "geoip", "database_manager")
true
rescue LoadError => e
@logger.info("DatabaseManager is not in classpath", :version => LOGSTASH_VERSION, :exception => e)
Expand Down
3 changes: 2 additions & 1 deletion logstash-filter-geoip.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-filter-geoip'
s.version = '7.0.1'
s.version = '7.1.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Adds geographical information about an IP address"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -22,6 +22,7 @@ Gem::Specification.new do |s|

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~>1.1'
s.add_development_dependency 'logstash-devutils'
s.add_development_dependency 'insist'
s.add_development_dependency 'benchmark-ips'
Expand Down
203 changes: 203 additions & 0 deletions spec/filters/geoip_ecs_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/filters/geoip"
require_relative 'test_helper'
require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper'

CITYDB = ::Dir.glob(::File.expand_path(::File.join("..", "..", "..", "vendor", "GeoLite2-City.mmdb"), __FILE__)).first
ASNDB = ::Dir.glob(::File.expand_path(::File.join("..", "..", "..", "vendor", "GeoLite2-ASN.mmdb"), __FILE__)).first

describe LogStash::Filters::GeoIP do
let(:options) { {} }
let(:plugin) { LogStash::Filters::GeoIP.new(options) }

describe "simple ip filter", :aggregate_failures do

context "when specifying the target", :ecs_compatibility_support do
ecs_compatibility_matrix(:disabled, :v1) do |ecs_select|

let(:ip) { "8.8.8.8" }
let(:event) { LogStash::Event.new("message" => ip) }
let(:target) { "server" }
let(:common_options) { {"source" => "message", "database" => CITYDB, "target" => target} }

before(:each) do
allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility)
plugin.register
end

context "with city database" do
let(:options) { common_options }

it "should return geo in target" do
plugin.filter(event)

expect( event.get ecs_select[disabled: "[#{target}][ip]", v1: "[#{target}][ip]"] ).to eq ip
expect( event.get ecs_select[disabled: "[#{target}][country_code2]", v1: "[#{target}][geo][country_iso_code]"] ).to eq 'US'
expect( event.get ecs_select[disabled: "[#{target}][country_name]", v1: "[#{target}][geo][country_name]"] ).to eq 'United States'
expect( event.get ecs_select[disabled: "[#{target}][continent_code]", v1: "[#{target}][geo][continent_code]"] ).to eq 'NA'
expect( event.get ecs_select[disabled: "[#{target}][location][lat]", v1: "[#{target}][geo][location][lat]"] ).to eq 37.751
expect( event.get ecs_select[disabled: "[#{target}][location][lon]", v1: "[#{target}][geo][location][lon]"] ).to eq -97.822

if ecs_select.active_mode == :disabled
expect( event.get "[#{target}][country_code3]" ).to eq 'US'
else
expect( event.get "[#{target}][geo][country_code3]" ).to be_nil
expect( event.get "[#{target}][country_code3]" ).to be_nil
end
end
end


context "with ASN database" do
let(:options) { common_options.merge({"database" => ASNDB}) }

it "should return geo in target" do
plugin.filter(event)

expect( event.get ecs_select[disabled: "[#{target}][ip]", v1: "[#{target}][ip]"] ).to eq ip
expect( event.get ecs_select[disabled: "[#{target}][asn]", v1: "[#{target}][as][number]"] ).to eq 15169
expect( event.get ecs_select[disabled: "[#{target}][as_org]", v1: "[#{target}][as][organization][name]"] ).to eq "Google LLC"
end
end

context "with customize fields" do
let(:fields) { ["continent_name", "timezone"] }
let(:options) { common_options.merge({"fields" => fields}) }

it "should return fields" do
plugin.filter(event)

expect( event.get ecs_select[disabled: "[#{target}][ip]", v1: "[#{target}][ip]"] ).to be_nil
expect( event.get ecs_select[disabled: "[#{target}][country_code2]", v1: "[#{target}][geo][country_iso_code]"] ).to be_nil
expect( event.get ecs_select[disabled: "[#{target}][country_name]", v1: "[#{target}][geo][country_name]"] ).to be_nil
expect( event.get ecs_select[disabled: "[#{target}][continent_code]", v1: "[#{target}][geo][continent_code]"] ).to be_nil
expect( event.get ecs_select[disabled: "[#{target}][location][lat]", v1: "[#{target}][geo][location][lat]"] ).to be_nil
expect( event.get ecs_select[disabled: "[#{target}][location][lon]", v1: "[#{target}][geo][location][lon]"] ).to be_nil

expect( event.get ecs_select[disabled: "[#{target}][continent_name]", v1: "[#{target}][geo][continent_name]"] ).to eq "North America"
expect( event.get ecs_select[disabled: "[#{target}][timezone]", v1: "[#{target}][geo][timezone]"] ).to eq "America/Chicago"
end
end

end
end

context "setup target field" do
let(:ip) { "8.8.8.8" }
let(:event) { LogStash::Event.new("message" => ip) }
let(:common_options) { {"source" => "message", "database" => CITYDB} }

context "ECS disabled" do
before do
allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(:disabled)
plugin.register
plugin.filter(event)
end

context "`target` is unset" do
let(:options) { common_options }
it "should use 'geoip'" do
expect( event.get "[geoip][ip]" ).to eq ip
end
end

context "`target` is set" do
let(:target) { 'host' }
let(:options) { common_options.merge({"target" => target}) }
it "should use `target`" do
expect( event.get "[#{target}][ip]" ).to eq ip
end
end
end

context "ECS mode" do
before do
allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(:v1)
end

context "`target` is unset" do

context "`source` end with [ip]" do
let(:event) { LogStash::Event.new("host" => {"ip" => ip}) }
let(:options) { common_options.merge({"source" => "[host][ip]"}) }

it "should use source's parent as target" do
plugin.register
plugin.filter(event)
expect( event.get "[host][geo][country_iso_code]" ).to eq 'US'
end
end

context "`source` end with [ip] but `target` does not match ECS template" do
let(:event) { LogStash::Event.new("hostname" => {"ip" => ip}) }
let(:options) { common_options.merge({"source" => "[hostname][ip]"}) }

it "should use source's parent as target with warning" do
expect(plugin.logger).to receive(:warn).with(/ECS expect `target`/)
plugin.register
plugin.filter(event)
expect( event.get "[hostname][geo][country_iso_code]" ).to eq 'US'
end
end

context "`source` == [ip]" do
let(:event) { LogStash::Event.new("ip" => ip) }
let(:options) { common_options.merge({"source" => "[ip]"}) }

it "should raise error to require `target`" do
expect { plugin.register }.to raise_error LogStash::ConfigurationError, /requires a `target`/
end
end

context "`source` not end with [ip]" do
let(:event) { LogStash::Event.new("host_ip" => ip) }
let(:options) { common_options.merge({"source" => "host_ip"}) }

it "should raise error to require `target`" do
expect { plugin.register }.to raise_error LogStash::ConfigurationError, /requires a `target`/
end
end
end

context "`target` is set" do
let(:event) { LogStash::Event.new("client" => {"ip" => ip}) }
let(:options) { common_options.merge({"source" => "[client][ip]", "target" => target}) }

context "`target` matches ECS template" do
let(:target) { 'host' }

it "should use `target`" do
plugin.register
plugin.filter(event)
expect( event.get "[#{target}][geo][country_iso_code]" ).to eq 'US'
end
end

context "`target` in canonical field reference syntax matches ECS template" do
let(:target) { '[host]' }

it "should normalize and use `target`" do
expect(plugin.logger).to receive(:warn).never
plugin.register
plugin.filter(event)
expect( event.get "[host][geo][country_iso_code]" ).to eq 'US'
end
end

context "`target` does not match ECS template" do
let(:target) { 'host_ip' }

it "should use `target` with warning" do
expect(plugin.logger).to receive(:warn).with(/ECS expect `target`/)
plugin.register
plugin.filter(event)
expect( event.get "[#{target}][geo][country_iso_code]" ).to eq 'US'
end
end
end
end
end

end
end
4 changes: 2 additions & 2 deletions spec/filters/geoip_offline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
require "insist"
require "logstash/filters/geoip"

CITYDB = ::Dir.glob(::File.expand_path("../../vendor/", ::File.dirname(__FILE__))+"/GeoLite2-City.mmdb").first
ASNDB = ::Dir.glob(::File.expand_path("../../vendor/", ::File.dirname(__FILE__))+"/GeoLite2-ASN.mmdb").first
CITYDB = ::Dir.glob(::File.expand_path(::File.join("..", "..", "..", "vendor", "GeoLite2-City.mmdb"), __FILE__)).first
ASNDB = ::Dir.glob(::File.expand_path(::File.join("..", "..", "..", "vendor", "GeoLite2-ASN.mmdb"), __FILE__)).first

describe LogStash::Filters::GeoIP do
describe "defaults" do
Expand Down
Loading

0 comments on commit f9de15b

Please sign in to comment.