Skip to content

Commit

Permalink
added otel support inside transform-network
Browse files Browse the repository at this point in the history
  • Loading branch information
KalmanMeth committed Nov 20, 2023
1 parent d9e4f3b commit 4a75737
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 31 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ All the kubernetes fields will be named by appending `output` value
In addition, if the `parameters` value is not empty, fields with kubernetes labels
will be generated, and named by appending `parameters` value to the label keys.

If `assignee` is set to `otel` then the output fields of `add_kubernetes` will be produced in opentelemetry format.

> Note: kubernetes connection is done using the first available method:
> 1. configuration parameter `KubeConfigPath` (in the example above `/tmp/config`) or
> 2. using `KUBECONFIG` environment variable
Expand Down
12 changes: 6 additions & 6 deletions pkg/pipeline/transform/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ const (
kubeConfigEnvVariable = "KUBECONFIG"
syncTime = 10 * time.Minute
IndexIP = "byIP"
typeNode = "Node"
typePod = "Pod"
typeService = "Service"
TypeNode = "Node"
TypePod = "Pod"
TypeService = "Service"
)

type kubeDataInterface interface {
Expand Down Expand Up @@ -204,7 +204,7 @@ func (k *KubeData) initNodeInformer(informerFactory informers.SharedInformerFact
Labels: node.Labels,
},
ips: ips,
Type: typeNode,
Type: TypeNode,
// We duplicate HostIP and HostName information to simplify later filtering e.g. by
// Host IP, where we want to get all the Pod flows by src/dst host, but also the actual
// host-to-host flows by the same field.
Expand Down Expand Up @@ -244,7 +244,7 @@ func (k *KubeData) initPodInformer(informerFactory informers.SharedInformerFacto
Labels: pod.Labels,
OwnerReferences: pod.OwnerReferences,
},
Type: typePod,
Type: TypePod,
HostIP: pod.Status.HostIP,
ips: ips,
}, nil
Expand Down Expand Up @@ -281,7 +281,7 @@ func (k *KubeData) initServiceInformer(informerFactory informers.SharedInformerF
Namespace: svc.Namespace,
Labels: svc.Labels,
},
Type: typeService,
Type: TypeService,
ips: ips,
}, nil
}); err != nil {
Expand Down
89 changes: 64 additions & 25 deletions pkg/pipeline/transform/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,31 +98,7 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo
}
outputEntry[rule.Output] = serviceName
case api.OpAddKubernetes:
kubeInfo, err := kubernetes.Data.GetInfo(fmt.Sprintf("%s", outputEntry[rule.Input]))
if err != nil {
logrus.WithError(err).Tracef("can't find kubernetes info for IP %v", outputEntry[rule.Input])
continue
}
// NETOBSERV-666: avoid putting empty namespaces or Loki aggregation queries will
// differentiate between empty and nil namespaces.
if kubeInfo.Namespace != "" {
outputEntry[rule.Output+"_Namespace"] = kubeInfo.Namespace
}
outputEntry[rule.Output+"_Name"] = kubeInfo.Name
outputEntry[rule.Output+"_Type"] = kubeInfo.Type
outputEntry[rule.Output+"_OwnerName"] = kubeInfo.Owner.Name
outputEntry[rule.Output+"_OwnerType"] = kubeInfo.Owner.Type
if rule.Parameters != "" {
for labelKey, labelValue := range kubeInfo.Labels {
outputEntry[rule.Parameters+"_"+labelKey] = labelValue
}
}
if kubeInfo.HostIP != "" {
outputEntry[rule.Output+"_HostIP"] = kubeInfo.HostIP
if kubeInfo.HostName != "" {
outputEntry[rule.Output+"_HostName"] = kubeInfo.HostName
}
}
fillInK8s(outputEntry, rule)
case api.OpReinterpretDirection:
reinterpretDirection(outputEntry, &n.DirectionInfo)
case api.OpAddIPCategory:
Expand All @@ -143,6 +119,69 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo
return outputEntry, true
}

func fillInK8s(outputEntry config.GenericMap, rule api.NetworkTransformRule) {
kubeInfo, err := kubernetes.Data.GetInfo(fmt.Sprintf("%s", outputEntry[rule.Input]))
if err != nil {
logrus.WithError(err).Tracef("can't find kubernetes info for IP %v", outputEntry[rule.Input])
return
}
if rule.Assignee != "otel" {
// NETOBSERV-666: avoid putting empty namespaces or Loki aggregation queries will
// differentiate between empty and nil namespaces.
if kubeInfo.Namespace != "" {
outputEntry[rule.Output+"_Namespace"] = kubeInfo.Namespace
}
outputEntry[rule.Output+"_Name"] = kubeInfo.Name
outputEntry[rule.Output+"_Type"] = kubeInfo.Type
outputEntry[rule.Output+"_OwnerName"] = kubeInfo.Owner.Name
outputEntry[rule.Output+"_OwnerType"] = kubeInfo.Owner.Type
if rule.Parameters != "" {
for labelKey, labelValue := range kubeInfo.Labels {
outputEntry[rule.Parameters+"_"+labelKey] = labelValue
}
}
if kubeInfo.HostIP != "" {
outputEntry[rule.Output+"_HostIP"] = kubeInfo.HostIP
if kubeInfo.HostName != "" {
outputEntry[rule.Output+"_HostName"] = kubeInfo.HostName
}
}
} else {
// NOTE: Some of these fields are taken from opentelemetry specs.
// See https://opentelemetry.io/docs/specs/semconv/resource/k8s/
// Other fields (not specified in the specs) are named similarly
if kubeInfo.Namespace != "" {
outputEntry[rule.Output+"k8s.namespace.name"] = kubeInfo.Namespace
}
switch kubeInfo.Type {
case kubernetes.TypeNode:
outputEntry[rule.Output+"k8s.node.name"] = kubeInfo.Name
outputEntry[rule.Output+"k8s.node.uid"] = kubeInfo.UID
case kubernetes.TypePod:
outputEntry[rule.Output+"k8s.pod.name"] = kubeInfo.Name
outputEntry[rule.Output+"k8s.pod.uid"] = kubeInfo.UID
case kubernetes.TypeService:
outputEntry[rule.Output+"k8s.service.name"] = kubeInfo.Name
outputEntry[rule.Output+"k8s.service.uid"] = kubeInfo.UID
}
outputEntry[rule.Output+"k8s.name"] = kubeInfo.Name
outputEntry[rule.Output+"k8s.type"] = kubeInfo.Type
outputEntry[rule.Output+"k8s.owner.name"] = kubeInfo.Owner.Name
outputEntry[rule.Output+"k8s.owner.type"] = kubeInfo.Owner.Type
if rule.Parameters != "" {
for labelKey, labelValue := range kubeInfo.Labels {
outputEntry[rule.Parameters+"."+labelKey] = labelValue
}
}
if kubeInfo.HostIP != "" {
outputEntry[rule.Output+"k8s.host.ip"] = kubeInfo.HostIP
if kubeInfo.HostName != "" {
outputEntry[rule.Output+"k8s.host.name"] = kubeInfo.HostName
}
}
}
}

func (n *Network) categorizeIP(ip net.IP) string {
if ip != nil {
for _, subnetCat := range n.categories {
Expand Down

0 comments on commit 4a75737

Please sign in to comment.