Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IPFIX Export support for Kubernetes transformations #338

Merged
merged 27 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b5b1fb2
Add export support of IPFIX for basic flow metrics
praveingk Nov 18, 2022
f3e3b7a
Ipfix export support options
praveingk Nov 24, 2022
53f656d
Changes in vendor
praveingk Dec 6, 2022
cbf81ee
Upgrade go-ipfix to recent release
praveingk Dec 6, 2022
51a2f5d
a
praveingk Dec 6, 2022
233caf1
IPFIX example configurations
praveingk Dec 6, 2022
f584969
Add Custom enterprise to API and handling
praveingk Dec 6, 2022
5831fb5
Remove debug prints
praveingk Dec 6, 2022
e14cd23
Change log declaration
praveingk Dec 6, 2022
66b8e27
Vendor changes after rebase
praveingk Dec 6, 2022
9af504d
Add Validation and defaults for ipfix config
praveingk Dec 8, 2022
8bea352
Cache entities for performance
praveingk Dec 15, 2022
8461c89
Merge branch 'main' of https://github.com/praveingk/flowlogs-pipeline…
praveingk Dec 15, 2022
6c3f659
Correct ipfix-dump
praveingk Jan 12, 2023
98d7780
Update pkg/pipeline/write/write_ipfix.go
praveingk Jan 12, 2023
fbc8cd1
Fix v6 sending data record
praveingk Jan 12, 2023
9616a0e
Merge branch 'main' of https://github.com/praveingk/flowlogs-pipeline…
praveingk Jan 12, 2023
a6f8cb7
Correct capitalized protocol value
praveingk Jan 12, 2023
417cd72
Update pkg/api/write_ipfix.go
praveingk Jan 12, 2023
5867146
Update pkg/api/write_ipfix.go
praveingk Jan 12, 2023
96c5abd
Handle error in loading custom registry
praveingk Jan 12, 2023
397f5d1
Fix corner cases of nil condition
praveingk Jan 18, 2023
40bd9fc
Fix typo
praveingk Jan 19, 2023
d88371c
Revert "Fix typo"
praveingk Jan 19, 2023
0a0679d
Update pkg/pipeline/write/write_ipfix.go
praveingk Jan 19, 2023
bdd9d1b
Merge branch 'main' into main
praveingk Jan 20, 2023
ce66d6c
Fixing lint error due to merge
praveingk Jan 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ require (
github.com/spf13/viper v1.10.1
github.com/stretchr/testify v1.8.0
github.com/vladimirvivien/gexe v0.1.1
github.com/vmware/go-ipfix v0.5.12
github.com/vmware/go-ipfix v0.5.13
golang.org/x/net v0.0.0-20220722155237-a158d28d115b
google.golang.org/grpc v1.45.0
google.golang.org/protobuf v1.28.0
google.golang.org/protobuf v1.28.1
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.24.0
k8s.io/apimachinery v0.24.0
Expand Down Expand Up @@ -79,6 +79,10 @@ require (
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pion/dtls/v2 v2.0.3 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/transport v0.10.1 // indirect
github.com/pion/udp v0.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
Expand All @@ -96,6 +100,7 @@ require (
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac // indirect
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -779,10 +779,14 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pion/dtls/v2 v2.0.3 h1:3qQ0s4+TXD00rsllL8g8KQcxAs+Y/Z6oz618RXX6p14=
github.com/pion/dtls/v2 v2.0.3/go.mod h1:TUjyL8bf8LH95h81Xj7kATmzMRt29F/4lxpIPj2Xe4Y=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/transport v0.10.0/go.mod h1:BnHnUipd0rZQyTVB2SBGojFHT9CBt5C5TcsJSQGkvSE=
github.com/pion/transport v0.10.1 h1:2W+yJT+0mOQ160ThZYUx5Zp2skzshiNgxrNE9GUfhJM=
github.com/pion/transport v0.10.1/go.mod h1:PBis1stIILMiis0PewDw91WJeLJkyIMcEk+DwKOzf4A=
github.com/pion/udp v0.1.0 h1:uGxQsNyrqG3GLINv36Ff60covYmfrLoxzwnCsIYspXI=
github.com/pion/udp v0.1.0/go.mod h1:BPELIjbwE9PRbd/zxI/KYBnbo7B6+oA6YuEaNE8lths=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -941,6 +945,8 @@ github.com/vladimirvivien/gexe v0.1.1 h1:2A0SBaOSKH+cwLVdt6H+KkHZotZWRNLlWygANGw
github.com/vladimirvivien/gexe v0.1.1/go.mod h1:LHQL00w/7gDUKIak24n801ABp8C+ni6eBht9vGVst8w=
github.com/vmware/go-ipfix v0.5.12 h1:mqQknlvnvDY25apPNy9c27ri3FMDFIhzvO68Kk5Qp58=
github.com/vmware/go-ipfix v0.5.12/go.mod h1:yzbG1rv+yJ8GeMrRm+MDhOV3akygNZUHLhC1pDoD2AY=
github.com/vmware/go-ipfix v0.5.13 h1:LtuJVf38XCghUN+WaI9OwQ1U7yim/pcmxz0PzdCqUnQ=
github.com/vmware/go-ipfix v0.5.13/go.mod h1:YqAPuFn4UMdiJVUI5YGXtrSmqi+lNMx2jewYOUryuws=
github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw=
Expand Down Expand Up @@ -1364,6 +1370,7 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f h1:GGU+dLjvlC3qDwqYgL6UgRmHXhOOgns0bZu2Ty5mm6U=
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gomodules.xyz/jsonpatch/v2 v2.2.0 h1:4pT439QV83L+G9FkcCriY6EkpcK6r6bK+A5FBUMI7qY=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
Expand Down Expand Up @@ -1534,6 +1541,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 h1:FVCohIoYO7IJoDDVpV2pdq7SgrMH6wHnuTyrdrxJNoY=
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0/go.mod h1:OdE7CF6DbADk7lN8LIKRzRJTTZXIjtWgA5THM5lhBAw=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
Expand Down
Binary file added hack/examples/ipfix-collector/ipfix-collector
Binary file not shown.
183 changes: 183 additions & 0 deletions hack/examples/ipfix-collector/ipfix-dump.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package main
praveingk marked this conversation as resolved.
Show resolved Hide resolved

import (
"bytes"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

ipfixCollector "github.com/vmware/go-ipfix/pkg/collector"
"github.com/vmware/go-ipfix/pkg/entities"
"github.com/vmware/go-ipfix/pkg/registry"
)

const (
hostPortIPv4 = ":9998"
hostPortIPv6 = "[::1]:0"
)

var (
transportType = flag.String("transport", "tcp", "transport type :tcp/udp")
)

const (
// SevoneEnterpriseID is the enterprise ID for SevOne Information Elements
SevoneEnterpriseID uint32 = 27207
)

func loadSevoneRegistry() error {
err := registry.InitNewRegistry(SevoneEnterpriseID)
if err != nil {
fmt.Printf("Failed Initialization")
}
err = registry.RegisterIE((*entities.NewInfoElement("sourcePodNamespace", 7733, 13, SevoneEnterpriseID, 65535)), SevoneEnterpriseID)
praveingk marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
fmt.Printf("Failed to register element")
return err
}
err = registry.RegisterIE((*entities.NewInfoElement("sourcePodName", 7734, 13, SevoneEnterpriseID, 65535)), SevoneEnterpriseID)
if err != nil {
fmt.Printf("Failed to register element")
return err
}
err = registry.RegisterIE((*entities.NewInfoElement("destinationPodNamespace", 7735, 13, SevoneEnterpriseID, 65535)), SevoneEnterpriseID)
if err != nil {
fmt.Printf("Failed to register element")
return err
}
err = registry.RegisterIE((*entities.NewInfoElement("destinationPodName", 7736, 13, SevoneEnterpriseID, 65535)), SevoneEnterpriseID)
if err != nil {
fmt.Printf("Failed to register element")
return err
}
err = registry.RegisterIE((*entities.NewInfoElement("sourceNodeName", 7737, 13, SevoneEnterpriseID, 65535)), SevoneEnterpriseID)
if err != nil {
fmt.Printf("Failed to register element")
return err
}
return nil
}
func printIPFIXMessage(msg *entities.Message) {
var buf bytes.Buffer
fmt.Fprint(&buf, "\nIPFIX-HDR:\n")
fmt.Fprintf(&buf, " version: %v, Message Length: %v\n", msg.GetVersion(), msg.GetMessageLen())
fmt.Fprintf(&buf, " Exported Time: %v (%v)\n", msg.GetExportTime(), time.Unix(int64(msg.GetExportTime()), 0))
fmt.Fprintf(&buf, " Sequence No.: %v, Observation Domain ID: %v\n", msg.GetSequenceNum(), msg.GetObsDomainID())

set := msg.GetSet()
if set.GetSetType() == entities.Template {
fmt.Fprint(&buf, "TEMPLATE SET:\n")
for i, record := range set.GetRecords() {
fmt.Fprintf(&buf, " TEMPLATE RECORD-%d:\n", i)
for _, ie := range record.GetOrderedElementList() {
fmt.Printf("%+v", ie)
elem := ie.GetInfoElement()
fmt.Fprintf(&buf, " %s: len=%d (enterprise ID = %d) \n", elem.Name, elem.Len, elem.EnterpriseId)
}
}
} else {
fmt.Fprint(&buf, "DATA SET:\n")
for i, record := range set.GetRecords() {
fmt.Fprintf(&buf, " DATA RECORD-%d:\n", i)
for _, ie := range record.GetOrderedElementList() {
elem := ie.GetInfoElement()
switch elem.DataType {
case entities.Unsigned8:
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetUnsigned8Value())
case entities.Unsigned16:
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetUnsigned16Value())
case entities.Unsigned32:
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetUnsigned32Value())
case entities.Unsigned64:
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetUnsigned64Value())
case entities.Signed8:
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetSigned8Value())
case entities.Signed16:
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetSigned16Value())
case entities.Signed32:
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetSigned32Value())
case entities.Signed64:
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetSigned64Value())
case entities.Float32:
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetFloat32Value())
case entities.Float64:
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetFloat64Value())
case entities.Boolean:
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetBooleanValue())
case entities.DateTimeSeconds:
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetUnsigned32Value())
case entities.DateTimeMilliseconds:
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetUnsigned64Value())
case entities.DateTimeMicroseconds, entities.DateTimeNanoseconds:
err := fmt.Errorf("API does not support micro and nano seconds types yet")
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, err)
case entities.MacAddress:
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetMacAddressValue())
case entities.Ipv4Address, entities.Ipv6Address:
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetIPAddressValue())
case entities.String:
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetStringValue())
default:
err := fmt.Errorf("API supports only valid information elements with datatypes given in RFC7011")
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, err)
}
}
}
}
log.Printf(buf.String())
}

func signalHandler(stopCh chan struct{}, messageReceived chan *entities.Message) {
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

for {
select {
case msg := <-messageReceived:
fmt.Printf("Got a Message\n")
printIPFIXMessage(msg)
case <-signalCh:
close(stopCh)
return
}
}
}

func main() {
log.SetFlags(0)
flag.Parse()

// Create exporter using local server info
var input = ipfixCollector.CollectorInput{
Address: hostPortIPv4,
Protocol: *transportType,
MaxBufferSize: 1024,
}
registry.LoadRegistry()
loadSevoneRegistry()
praveingk marked this conversation as resolved.
Show resolved Hide resolved
cp, err := ipfixCollector.InitCollectingProcess(input)
if err != nil {
log.Fatalf("UDP Collecting Process does not start correctly: %v", err)
}
// Start listening to connections and receiving messages.
messageReceived := make(chan *entities.Message)
go func() {
go cp.Start()
msgChan := cp.GetMsgChan()
for message := range msgChan {
messageReceived <- message
}
}()

stopCh := make(chan struct{})
go signalHandler(stopCh, messageReceived)

<-stopCh
// Stop the collector process
cp.Stop()
log.Printf("Stopping IPFIX collector")
}
33 changes: 33 additions & 0 deletions hack/examples/ipfix-manifests/enrich-ipfix-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
log-level: info
pipeline:
- name: ingest
- name: enrich
follows: ingest
- name: write_ipfix
follows: enrich
parameters:
- name: ingest
ingest:
type: grpc
grpc:
port: 9999
- name: enrich
transform:
type: network
network:
rules:
- input: SrcAddr
output: SrcK8S
type: "add_kubernetes"
- input: DstAddr
output: DstK8S
type: "add_kubernetes"
- name: write_ipfix
write:
type: ipfix
ipfix:
targetHost: 127.0.0.1
targetPort: 9998
transport: tcp
enterpriseID: 27207

34 changes: 34 additions & 0 deletions hack/examples/ipfix-manifests/manifests/agent.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: netobserv-ebpf-agent
labels:
k8s-app: netobserv-ebpf-agent
spec:
selector:
matchLabels:
k8s-app: netobserv-ebpf-agent
template:
metadata:
labels:
k8s-app: netobserv-ebpf-agent
spec:
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
containers:
- name: netobserv-ebpf-agent
image: localhost/ebpf-agent:test
securityContext:
privileged: true
runAsUser: 0
env:
- name: CACHE_ACTIVE_TIMEOUT
value: 200ms
- name: LOG_LEVEL
value: debug
- name: FLOWS_TARGET_HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: FLOWS_TARGET_PORT
value: "9999"
70 changes: 70 additions & 0 deletions hack/examples/ipfix-manifests/manifests/flp.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: flp
labels:
k8s-app: flp
spec:
selector:
matchLabels:
k8s-app: flp
template:
metadata:
labels:
k8s-app: flp
spec:
serviceAccountName: ebpf-agent-test
containers:
- name: flp
image: quay.io/praveingk/flowlogs-pipeline:latest
ports:
- containerPort: 9999
hostPort: 9999
args:
- --config=/etc/flp/config.yaml
volumeMounts:
- mountPath: /etc/flp
name: config-volume
volumes:
- name: config-volume
configMap:
name: flp-config
---
apiVersion: v1
kind: ConfigMap
metadata:
name: flp-config
data:
config.yaml: |
log-level: debug
pipeline:
- name: ingest
- name: enrich
follows: ingest
- name: write_ipfix
follows: enrich
parameters:
- name: ingest
ingest:
type: grpc
grpc:
port: 9999
- name: enrich
transform:
type: network
network:
rules:
- input: SrcAddr
output: SrcK8S
type: "add_kubernetes"
- input: DstAddr
output: DstK8S
type: "add_kubernetes"
- name: write_ipfix
write:
type: ipfix
ipfix:
targetHost: 10.244.0.5
targetPort: 9998
transport: tcp
enterpriseID: 27207
Loading