Skip to content

Commit

Permalink
work on populate netbox inventory passively (idaholab#135): reorganiz…
Browse files Browse the repository at this point in the history
…ing logstash pipeline so we have the data we need at autopopulate time
  • Loading branch information
mmguero committed Jun 1, 2023
1 parent 1664c46 commit 2aed1b3
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 144 deletions.
109 changes: 0 additions & 109 deletions logstash/pipelines/enrichment/11_lookups.conf
Original file line number Diff line number Diff line change
Expand Up @@ -454,115 +454,6 @@ filter {
}
}

################################################################################################
# Do enrichment based on NetBox lookups:
# - source.ip -> source.device and source.segment
# - destination.ip -> destination.device and destination.segment
# - TODO: source.mac -> source.device
# - TODO: destination.mac -> destination.device
# The LOGSTASH_NETBOX_ENRICHMENT environment variable is checked inside netbox_enrich.rb
# and will short-circuit unles this feature is enabled.
#
# Enrich zeek conn.log, notice.log, weird.log, signatures.log, software.log, known*.log and all non-zeek data sources

if (([event][provider] != "zeek") or
([event][dataset] == "conn") or
([event][dataset] == "notice") or
([event][dataset] == "weird") or
([event][dataset] == "signatures") or
([event][dataset] == "software") or
([event][dataset] =~ /^known/)) {
if ([source][ip]) and
(([network][direction] == "internal") or ([network][direction] == "outbound")) {
ruby {
id => "ruby_netbox_enrich_source_ip_device"
path => "/usr/share/logstash/malcolm-ruby/netbox_enrich.rb"
script_params => {
"source" => "[source][ip]"
"target" => "[source][device]"
"lookup_type" => "ip_device"
"lookup_site_env" => "NETBOX_DEFAULT_SITE"
"lookup_service" => "false"
"verbose_env" => "LOGSTASH_NETBOX_ENRICHMENT_VERBOSE"
"autopopulate_env" => "LOGSTASH_NETBOX_AUTO_POPULATE"
"netbox_token_env" => "SUPERUSER_API_TOKEN"
}
}
ruby {
id => "ruby_netbox_enrich_source_ip_segment"
path => "/usr/share/logstash/malcolm-ruby/netbox_enrich.rb"
script_params => {
"source" => "[source][ip]"
"target" => "[source][segment]"
"lookup_type" => "ip_vrf"
"lookup_site_env" => "NETBOX_DEFAULT_SITE"
"verbose_env" => "LOGSTASH_NETBOX_ENRICHMENT_VERBOSE"
"autopopulate_env" => "LOGSTASH_NETBOX_AUTO_POPULATE"
"netbox_token_env" => "SUPERUSER_API_TOKEN"
}
}
}
if ([destination][ip]) and
(([network][direction] == "internal") or ([network][direction] == "inbound")) {
ruby {
id => "ruby_netbox_enrich_destination_ip_device"
path => "/usr/share/logstash/malcolm-ruby/netbox_enrich.rb"
script_params => {
"source" => "[destination][ip]"
"target" => "[destination][device]"
"lookup_type" => "ip_device"
"lookup_site_env" => "NETBOX_DEFAULT_SITE"
"lookup_service_env" => "LOGSTASH_NETBOX_ENRICHMENT_LOOKUP_SERVICE"
"lookup_service_port_source" => "[destination][port]"
"verbose_env" => "LOGSTASH_NETBOX_ENRICHMENT_VERBOSE"
"netbox_token_env" => "SUPERUSER_API_TOKEN"
}
}
ruby {
id => "ruby_netbox_enrich_destination_ip_segment"
path => "/usr/share/logstash/malcolm-ruby/netbox_enrich.rb"
script_params => {
"source" => "[destination][ip]"
"target" => "[destination][segment]"
"lookup_type" => "ip_vrf"
"lookup_site_env" => "NETBOX_DEFAULT_SITE"
"verbose_env" => "LOGSTASH_NETBOX_ENRICHMENT_VERBOSE"
"netbox_token_env" => "SUPERUSER_API_TOKEN"
}
}
}
}

# collect site, role, manufacturer and device_type in "related." segment is merged as network.name in 20_enriched_to_ecs.conf
if ([source][device][site]) { mutate { id => "mutate_merge_source_device_site_related"
merge => { "[related][site]" => "[source][device][site]" } } }
if ([destination][device][site]) { mutate { id => "mutate_merge_destination_device_site_related"
merge => { "[related][site]" => "[destination][device][site]" } } }
if ([source][segment][site]) { mutate { id => "mutate_merge_source_segment_site_related"
merge => { "[related][site]" => "[source][segment][site]" } } }
if ([destination][segment][site]) { mutate { id => "mutate_merge_destination_segment_site_related"
merge => { "[related][site]" => "[destination][segment][site]" } } }
if ([source][device][role]) { mutate { id => "mutate_merge_source_device_role_related"
merge => { "[related][role]" => "[source][device][role]" } } }
if ([destination][device][role]) { mutate { id => "mutate_merge_destination_device_role_related"
merge => { "[related][role]" => "[destination][device][role]" } } }
if ([source][device][manufacturer]) { mutate { id => "mutate_merge_source_device_manufacturer_related"
merge => { "[related][manufacturer]" => "[source][device][manufacturer]" } } }
if ([destination][device][manufacturer]) { mutate { id => "mutate_merge_destination_device_manufacturer_related"
merge => { "[related][manufacturer]" => "[destination][device][manufacturer]" } } }
if ([source][device][device_type]) { mutate { id => "mutate_merge_source_device_type_related"
merge => { "[related][device_type]" => "[source][device][device_type]" } } }
if ([destination][device][device_type]) { mutate { id => "mutate_merge_destination_device_type_related"
merge => { "[related][device_type]" => "[destination][device][device_type]" } } }
if ([source][device][service]) { mutate { id => "mutate_merge_source_service_related"
merge => { "[related][service]" => "[source][device][service]" } } }
if ([destination][device][service]) { mutate { id => "mutate_merge_destination_service_related"
merge => { "[related][service]" => "[destination][device][service]" } } }
if ([source][device][name]) { mutate { id => "mutate_merge_source_device_name_related"
merge => { "[related][device_name]" => "[source][device][name]" } } }
if ([destination][device][name]) { mutate { id => "mutate_merge_destination_device_name_related"
merge => { "[related][device_name]" => "[destination][device][name]" } } }

################################################################################################

# tag ICS services from if not already tagged
Expand Down
16 changes: 0 additions & 16 deletions logstash/pipelines/enrichment/18_tags_finalize.conf

This file was deleted.

7 changes: 0 additions & 7 deletions logstash/pipelines/enrichment/20_enriched_to_ecs.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,6 @@ filter {
add_field => { "[network][iana_number]" => "%{[ipProtocol]}" } }
}

# network.name (based on info from [destination][segment][name] and [source][segment][name])
if ([destination][segment][name]) { mutate { id => "mutate_add_field_ecs_network_name_resp"
merge => { "[network][name]" => "[destination][segment][name]" } } }
if ([source][segment][name]) { mutate { id => "mutate_add_field_ecs_network_name_orig"
merge => { "[network][name]" => "[source][segment][name]" } } }


# ECS - various -> related.ip (all IP-type fields get rolled up into related.ip)
if ([source][ip]) { mutate { id => "mutate_merge_field_related_ip_source_ip"
merge => { "[related][ip]" => "[source][ip]" } } }
Expand Down
129 changes: 129 additions & 0 deletions logstash/pipelines/enrichment/21_netbox.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
filter {


################################################################################################
# Do enrichment based on NetBox lookups:
# - source.ip -> source.device and source.segment
# - destination.ip -> destination.device and destination.segment
# - TODO: source.mac -> source.device
# - TODO: destination.mac -> destination.device
# The LOGSTASH_NETBOX_ENRICHMENT environment variable is checked inside netbox_enrich.rb
# and will short-circuit unles this feature is enabled.
#
# Enrich zeek conn.log, notice.log, weird.log, signatures.log, software.log, known*.log and all non-zeek data sources

if (([event][provider] != "zeek") or
([event][dataset] == "conn") or
([event][dataset] == "notice") or
([event][dataset] == "weird") or
([event][dataset] == "signatures") or
([event][dataset] == "software") or
([event][dataset] =~ /^known/)) {
if ([source][ip]) and
(([network][direction] == "internal") or ([network][direction] == "outbound")) {
ruby {
id => "ruby_netbox_enrich_source_ip_segment"
path => "/usr/share/logstash/malcolm-ruby/netbox_enrich.rb"
script_params => {
"source" => "[source][ip]"
"target" => "[source][segment]"
"lookup_type" => "ip_vrf"
"lookup_site_env" => "NETBOX_DEFAULT_SITE"
"verbose_env" => "LOGSTASH_NETBOX_ENRICHMENT_VERBOSE"
"autopopulate_env" => "LOGSTASH_NETBOX_AUTO_POPULATE"
"netbox_token_env" => "SUPERUSER_API_TOKEN"
}
}
ruby {
id => "ruby_netbox_enrich_source_ip_device"
path => "/usr/share/logstash/malcolm-ruby/netbox_enrich.rb"
script_params => {
"source" => "[source][ip]"
"target" => "[source][device]"
"lookup_type" => "ip_device"
"lookup_site_env" => "NETBOX_DEFAULT_SITE"
"lookup_service" => "false"
"verbose_env" => "LOGSTASH_NETBOX_ENRICHMENT_VERBOSE"
"netbox_token_env" => "SUPERUSER_API_TOKEN"
# these are used for autopopulation only, not lookup/enrichment
"autopopulate_env" => "LOGSTASH_NETBOX_AUTO_POPULATE"
"source_hostname" => ""
"source_oui" => "[source][oui]"
"source_segment" => "[source][segment]"
}
}
}
if ([destination][ip]) and
(([network][direction] == "internal") or ([network][direction] == "inbound")) {
ruby {
id => "ruby_netbox_enrich_destination_ip_segment"
path => "/usr/share/logstash/malcolm-ruby/netbox_enrich.rb"
script_params => {
"source" => "[destination][ip]"
"target" => "[destination][segment]"
"lookup_type" => "ip_vrf"
"lookup_site_env" => "NETBOX_DEFAULT_SITE"
"verbose_env" => "LOGSTASH_NETBOX_ENRICHMENT_VERBOSE"
"netbox_token_env" => "SUPERUSER_API_TOKEN"
}
}
ruby {
id => "ruby_netbox_enrich_destination_ip_device"
path => "/usr/share/logstash/malcolm-ruby/netbox_enrich.rb"
script_params => {
"source" => "[destination][ip]"
"target" => "[destination][device]"
"lookup_type" => "ip_device"
"lookup_site_env" => "NETBOX_DEFAULT_SITE"
"lookup_service_env" => "LOGSTASH_NETBOX_ENRICHMENT_LOOKUP_SERVICE"
"lookup_service_port_source" => "[destination][port]"
"verbose_env" => "LOGSTASH_NETBOX_ENRICHMENT_VERBOSE"
"netbox_token_env" => "SUPERUSER_API_TOKEN"
# these are used for autopopulation only, not lookup/enrichment
"autopopulate_env" => "LOGSTASH_NETBOX_AUTO_POPULATE"
"source_hostname" => ""
"source_oui" => "[destination][oui]"
"source_segment" => "[destination][segment]"
}
}
}
}

# collect site, role, manufacturer and device_type in "related." segment is merged as network.name in 20_enriched_to_ecs.conf
if ([source][device][site]) { mutate { id => "mutate_merge_source_device_site_related"
merge => { "[related][site]" => "[source][device][site]" } } }
if ([destination][device][site]) { mutate { id => "mutate_merge_destination_device_site_related"
merge => { "[related][site]" => "[destination][device][site]" } } }
if ([source][segment][site]) { mutate { id => "mutate_merge_source_segment_site_related"
merge => { "[related][site]" => "[source][segment][site]" } } }
if ([destination][segment][site]) { mutate { id => "mutate_merge_destination_segment_site_related"
merge => { "[related][site]" => "[destination][segment][site]" } } }
if ([source][device][role]) { mutate { id => "mutate_merge_source_device_role_related"
merge => { "[related][role]" => "[source][device][role]" } } }
if ([destination][device][role]) { mutate { id => "mutate_merge_destination_device_role_related"
merge => { "[related][role]" => "[destination][device][role]" } } }
if ([source][device][manufacturer]) { mutate { id => "mutate_merge_source_device_manufacturer_related"
merge => { "[related][manufacturer]" => "[source][device][manufacturer]" } } }
if ([destination][device][manufacturer]) { mutate { id => "mutate_merge_destination_device_manufacturer_related"
merge => { "[related][manufacturer]" => "[destination][device][manufacturer]" } } }
if ([source][device][device_type]) { mutate { id => "mutate_merge_source_device_type_related"
merge => { "[related][device_type]" => "[source][device][device_type]" } } }
if ([destination][device][device_type]) { mutate { id => "mutate_merge_destination_device_type_related"
merge => { "[related][device_type]" => "[destination][device][device_type]" } } }
if ([source][device][service]) { mutate { id => "mutate_merge_source_service_related"
merge => { "[related][service]" => "[source][device][service]" } } }
if ([destination][device][service]) { mutate { id => "mutate_merge_destination_service_related"
merge => { "[related][service]" => "[destination][device][service]" } } }
if ([source][device][name]) { mutate { id => "mutate_merge_source_device_name_related"
merge => { "[related][device_name]" => "[source][device][name]" } } }
if ([destination][device][name]) { mutate { id => "mutate_merge_destination_device_name_related"
merge => { "[related][device_name]" => "[destination][device][name]" } } }

# network.name (based on info from [destination][segment][name] and [source][segment][name])
if ([destination][segment][name]) { mutate { id => "mutate_add_field_ecs_network_name_resp"
merge => { "[network][name]" => "[destination][segment][name]" } } }
if ([source][segment][name]) { mutate { id => "mutate_add_field_ecs_network_name_orig"
merge => { "[network][name]" => "[source][segment][name]" } } }

} # filter

13 changes: 13 additions & 0 deletions logstash/pipelines/enrichment/98_finalize.conf
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,17 @@ filter {
]
}

# remove tags we'd rather not see

mutate { id => "mutate_enrichment_tags_remove"
remove_tag => [ "beats_input_codec_plain_applied",
"beats_input_raw_event",
"_dateparsefailure",
"_grokparsefailure",
"_jsonparsefailure",
"_dissectfailure",
"_ouilookupfailure",
"_geoip_lookup_failure" ] }


}
41 changes: 29 additions & 12 deletions logstash/ruby/netbox_enrich.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,6 @@ def register(params)
end
@verbose = [1, true, '1', 'true', 't', 'on', 'enabled'].include?(_verbose_str.to_s.downcase)

# autopopulate - either specified directly or read from ENV via autopopulate_env
# false - do not autopopulate netbox inventory when uninventoried devices are observed
# true - autopopulate netbox inventory when uninventoried devices are observed (not recommended)
#
# For now this is only done for devices/virtual machines, not for services or network segments.
_autopopulate_str = params["autopopulate"]
_autopopulate_env = params["autopopulate_env"]
if _autopopulate_str.nil? and !_autopopulate_env.nil?
_autopopulate_str = ENV[_autopopulate_env]
end
@autopopulate = [1, true, '1', 'true', 't', 'on', 'enabled'].include?(_autopopulate_str.to_s.downcase)

# connection URL for netbox
@netbox_url = params.fetch("netbox_url", "http://netbox:8080/netbox/api").delete_suffix("/")
@netbox_url_suffix = "/netbox/api"
Expand All @@ -91,6 +79,28 @@ def register(params)

# hash of lookup types (from @lookup_type), each of which contains the respective looked-up values
@cache_hash = LruRedux::ThreadSafeCache.new(params.fetch("lookup_cache_size", 512))

# these are used for autopopulation only, not lookup/enrichment

# autopopulate - either specified directly or read from ENV via autopopulate_env
# false - do not autopopulate netbox inventory when uninventoried devices are observed
# true - autopopulate netbox inventory when uninventoried devices are observed (not recommended)
#
# For now this is only done for devices/virtual machines, not for services or network segments.
_autopopulate_str = params["autopopulate"]
_autopopulate_env = params["autopopulate_env"]
if _autopopulate_str.nil? and !_autopopulate_env.nil?
_autopopulate_str = ENV[_autopopulate_env]
end
@autopopulate = [1, true, '1', 'true', 't', 'on', 'enabled'].include?(_autopopulate_str.to_s.downcase)

# fields for device autopopulation
@source_hostname = params["source_hostname"]
@source_oui = params["source_oui"]
@source_segment = params["source_segment"]

# end of autopopulation arguments

end

def filter(event)
Expand All @@ -108,6 +118,10 @@ def filter(event)
_lookup_type = @lookup_type
_lookup_site = @lookup_site
_lookup_service_port = (@lookup_service ? event.get("#{@lookup_service_port_source}") : nil).to_i
_autopopulate = @autopopulate
_autopopulate_hostname = @autopopulate ? @source_hostname : nil
_autopopulate_oui = @autopopulate ? @source_oui : nil
_autopopulate_segment = @autopopulate ? @source_segment : nil
_result = @cache_hash.getset(_lookup_type){
LruRedux::TTL::ThreadSafeCache.new(@cache_size, @cache_ttl)
}.getset(_key){
Expand Down Expand Up @@ -209,6 +223,9 @@ def filter(event)
else
break
end
if _autopopulate and (_query[:offset] == 0)
# TODO: no results found, autopopulate enabled
end
end
rescue Faraday::Error
# give up aka do nothing
Expand Down

0 comments on commit 2aed1b3

Please sign in to comment.