diff --git a/controllers/flowcollector_controller.go b/controllers/flowcollector_controller.go index 7036a7c6c..7f8138170 100644 --- a/controllers/flowcollector_controller.go +++ b/controllers/flowcollector_controller.go @@ -6,8 +6,6 @@ import ( "net" "strings" - "github.com/netobserv/network-observability-operator/controllers/ebpf" - "github.com/netobserv/network-observability-operator/controllers/ovs" osv1alpha1 "github.com/openshift/api/console/v1alpha1" securityv1 "github.com/openshift/api/security/v1" appsv1 "k8s.io/api/apps/v1" @@ -25,7 +23,9 @@ import ( flowsv1alpha1 "github.com/netobserv/network-observability-operator/api/v1alpha1" "github.com/netobserv/network-observability-operator/controllers/consoleplugin" "github.com/netobserv/network-observability-operator/controllers/constants" + "github.com/netobserv/network-observability-operator/controllers/ebpf" "github.com/netobserv/network-observability-operator/controllers/flowlogspipeline" + "github.com/netobserv/network-observability-operator/controllers/ovs" "github.com/netobserv/network-observability-operator/controllers/reconcilers" "github.com/netobserv/network-observability-operator/pkg/discover" ) @@ -137,14 +137,22 @@ func (r *FlowCollectorReconciler) Reconcile(ctx context.Context, req ctrl.Reques } // OVS config map for CNO - ovsConfigController := ovs.NewFlowsConfigController(clientHelper, - ns, - desired.Spec.ClusterNetworkOperator.Namespace, - ovsFlowsConfigMapName, - r.lookupIP) - if err := ovsConfigController.Reconcile(ctx, desired); err != nil { - return ctrl.Result{}, - fmt.Errorf("failed to reconcile ovs-flows-config ConfigMap: %w", err) + if desired.Spec.ClusterNetworkOperator.Namespace != "" { + ovsConfigController := ovs.NewFlowsConfigCNOController(clientHelper, + ns, + desired.Spec.ClusterNetworkOperator.Namespace, + ovsFlowsConfigMapName, + r.lookupIP) + if err := ovsConfigController.Reconcile(ctx, desired); err != nil { + return ctrl.Result{}, + fmt.Errorf("failed to reconcile ovs-flows-config ConfigMap: %w", err) + } + } else { + ovsConfigController := ovs.NewFlowsConfigOVNKController(clientHelper, ns, r.lookupIP) + if err := ovsConfigController.Reconcile(ctx, desired); err != nil { + return ctrl.Result{}, + fmt.Errorf("failed to reconcile ovn-kubernetes DaemonSet: %w", err) + } } // eBPF agent diff --git a/controllers/flowlogspipeline/flp_objects.go b/controllers/flowlogspipeline/flp_objects.go index 55e4c5d12..ed7036f79 100644 --- a/controllers/flowlogspipeline/flp_objects.go +++ b/controllers/flowlogspipeline/flp_objects.go @@ -74,7 +74,7 @@ func (b *builder) deployment(configDigest string) *appsv1.Deployment { Selector: &metav1.LabelSelector{ MatchLabels: b.selector, }, - Template: b.podTemplate(configDigest), + Template: b.podTemplate(false, configDigest), }, } } @@ -90,12 +90,12 @@ func (b *builder) daemonSet(configDigest string) *appsv1.DaemonSet { Selector: &metav1.LabelSelector{ MatchLabels: b.selector, }, - Template: b.podTemplate(configDigest), + Template: b.podTemplate(true, configDigest), }, } } -func (b *builder) podTemplate(configDigest string) corev1.PodTemplateSpec { +func (b *builder) podTemplate(hostNetwork bool, configDigest string) corev1.PodTemplateSpec { var ports []corev1.ContainerPort var tolerations []corev1.Toleration if b.desired.Kind == constants.DaemonSetKind { @@ -177,6 +177,8 @@ func (b *builder) podTemplate(configDigest string) corev1.PodTemplateSpec { }}, Containers: []corev1.Container{container}, ServiceAccountName: constants.FLPName, + HostNetwork: hostNetwork, + DNSPolicy: corev1.DNSClusterFirst, }, } } diff --git a/controllers/ovs/flowsconfig_reconciler.go b/controllers/ovs/flowsconfig_cno_reconciler.go similarity index 89% rename from controllers/ovs/flowsconfig_reconciler.go rename to controllers/ovs/flowsconfig_cno_reconciler.go index 89944595d..c8a0c0f28 100644 --- a/controllers/ovs/flowsconfig_reconciler.go +++ b/controllers/ovs/flowsconfig_cno_reconciler.go @@ -17,7 +17,7 @@ import ( "github.com/netobserv/network-observability-operator/controllers/reconcilers" ) -type FlowsConfigController struct { +type FlowsConfigCNOController struct { ovsConfigMapName string collectorNamespace string cnoNamespace string @@ -25,10 +25,10 @@ type FlowsConfigController struct { lookupIP func(string) ([]net.IP, error) } -func NewFlowsConfigController(client reconcilers.ClientHelper, +func NewFlowsConfigCNOController(client reconcilers.ClientHelper, collectorNamespace, cnoNamespace, ovsConfigMapName string, - lookupIP func(string) ([]net.IP, error)) *FlowsConfigController { - return &FlowsConfigController{ + lookupIP func(string) ([]net.IP, error)) *FlowsConfigCNOController { + return &FlowsConfigCNOController{ client: client, collectorNamespace: collectorNamespace, cnoNamespace: cnoNamespace, @@ -39,9 +39,9 @@ func NewFlowsConfigController(client reconcilers.ClientHelper, // Reconcile reconciles the status of the ovs-flows-config configmap with // the target FlowCollector ipfix section map -func (c *FlowsConfigController) Reconcile( +func (c *FlowsConfigCNOController) Reconcile( ctx context.Context, target *flowsv1alpha1.FlowCollector) error { - rlog := log.FromContext(ctx, "component", "FlowsConfigController") + rlog := log.FromContext(ctx, "component", "FlowsConfigCNOController") current, err := c.current(ctx) if err != nil { @@ -91,7 +91,7 @@ func (c *FlowsConfigController) Reconcile( return nil } -func (c *FlowsConfigController) current(ctx context.Context) (*flowsConfig, error) { +func (c *FlowsConfigCNOController) current(ctx context.Context) (*flowsConfig, error) { curr := &corev1.ConfigMap{} if err := c.client.Get(ctx, types.NamespacedName{ Name: c.ovsConfigMapName, @@ -110,7 +110,7 @@ func (c *FlowsConfigController) current(ctx context.Context) (*flowsConfig, erro return configFromMap(curr.Data) } -func (c *FlowsConfigController) desired( +func (c *FlowsConfigCNOController) desired( ctx context.Context, coll *flowsv1alpha1.FlowCollector) (*flowsConfig, error) { conf := flowsConfig{FlowCollectorIPFIX: coll.Spec.IPFIX} @@ -153,7 +153,7 @@ func (c *FlowsConfigController) desired( return nil, fmt.Errorf("unexpected flowlogsPipeline kind: %s", coll.Spec.FlowlogsPipeline.Kind) } -func (c *FlowsConfigController) flowsConfigMap(fc *flowsConfig) (*corev1.ConfigMap, error) { +func (c *FlowsConfigCNOController) flowsConfigMap(fc *flowsConfig) (*corev1.ConfigMap, error) { cm := &corev1.ConfigMap{ TypeMeta: v1.TypeMeta{ Kind: "ConfigMap", diff --git a/controllers/ovs/flowsconfig_ovnk_reconciler.go b/controllers/ovs/flowsconfig_ovnk_reconciler.go new file mode 100644 index 000000000..8409c0089 --- /dev/null +++ b/controllers/ovs/flowsconfig_ovnk_reconciler.go @@ -0,0 +1,157 @@ +package ovs + +import ( + "context" + "errors" + "fmt" + "net" + "strconv" + "time" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/log" + + flowsv1alpha1 "github.com/netobserv/network-observability-operator/api/v1alpha1" + "github.com/netobserv/network-observability-operator/controllers/constants" + "github.com/netobserv/network-observability-operator/controllers/reconcilers" +) + +const ( + // Make configurable? + ovnkNamespace = "ovn-kubernetes" + ovnkDSName = "ovnkube-node" +) + +type FlowsConfigOVNKController struct { + namespace string + client reconcilers.ClientHelper + lookupIP func(string) ([]net.IP, error) +} + +func NewFlowsConfigOVNKController(client reconcilers.ClientHelper, namespace string, lookupIP func(string) ([]net.IP, error)) *FlowsConfigOVNKController { + return &FlowsConfigOVNKController{ + client: client, + namespace: namespace, + lookupIP: lookupIP, + } +} + +// Reconcile reconciles the status of the ovs-flows-config configmap with +// the target FlowCollector ipfix section map +func (c *FlowsConfigOVNKController) Reconcile( + ctx context.Context, target *flowsv1alpha1.FlowCollector) error { + rlog := log.FromContext(ctx, "component", "FlowsConfigOVNKController") + + current, err := c.current(ctx) + if err != nil { + return err + } + + desired, err := c.desired(ctx, target) + // compare current and desired + if err != nil { + return err + } + + ovnkubeNode := reconcilers.FindContainer(¤t.Spec.Template.Spec, "ovnkube-node") + if ovnkubeNode == nil { + return errors.New("could not find container ovnkube-node") + } + + anyUpdate := false + for k, v := range desired { + if checkUpdateEnv(k, v, ovnkubeNode) { + anyUpdate = true + } + } + if anyUpdate { + rlog.Info("Provided IPFIX configuration differs current configuration. Updating") + return c.client.Update(ctx, current) + } + + rlog.Info("No changes needed") + return nil +} + +func (c *FlowsConfigOVNKController) current(ctx context.Context) (*appsv1.DaemonSet, error) { + curr := &appsv1.DaemonSet{} + if err := c.client.Get(ctx, types.NamespacedName{ + Name: ovnkDSName, + Namespace: ovnkNamespace, + }, curr); err != nil { + return nil, fmt.Errorf("retrieving %s/%s daemonset: %w", ovnkNamespace, ovnkDSName, err) + } + return curr, nil +} + +func (c *FlowsConfigOVNKController) desired(ctx context.Context, coll *flowsv1alpha1.FlowCollector) (map[string]string, error) { + cacheTimeout, err := time.ParseDuration(coll.Spec.IPFIX.CacheActiveTimeout) + if err != nil { + return nil, err + } + + envs := map[string]string{ + "OVN_IPFIX_TARGETS": "", + "OVN_IPFIX_CACHE_ACTIVE_TIMEOUT": strconv.Itoa(int(cacheTimeout.Seconds())), + "OVN_IPFIX_CACHE_MAX_FLOWS": strconv.Itoa(int(coll.Spec.IPFIX.CacheMaxFlows)), + "OVN_IPFIX_SAMPLING": strconv.Itoa(int(coll.Spec.IPFIX.Sampling)), + } + + if coll.Spec.Agent != flowsv1alpha1.AgentIPFIX { + // No IPFIX => leave target empty and return + return envs, nil + } + + // According to the "OVS flow export configuration" RFE: + // nodePort be set by the NOO when the collector is deployed as a DaemonSet + // sharedTarget set when deployed as Deployment + Service + switch coll.Spec.FlowlogsPipeline.Kind { + case constants.DaemonSetKind: + envs["OVN_IPFIX_TARGETS"] = fmt.Sprintf(":%d", coll.Spec.FlowlogsPipeline.Port) + case constants.DeploymentKind: + svc := corev1.Service{} + if err := c.client.Get(ctx, types.NamespacedName{ + Namespace: c.namespace, + Name: constants.FLPName, + }, &svc); err != nil { + return nil, fmt.Errorf("can't get service %s in %s: %w", constants.FLPName, c.namespace, err) + } + // service IP resolution + svcHost := svc.Name + "." + svc.Namespace + addrs, err := c.lookupIP(svcHost) + if err != nil { + return nil, fmt.Errorf("can't resolve IP address for service %v: %w", svcHost, err) + } + var ip string + for _, addr := range addrs { + if len(addr) > 0 { + ip = addr.String() + break + } + } + if ip == "" { + return nil, fmt.Errorf("can't find any suitable IP for host %s", svcHost) + } + envs["OVN_IPFIX_TARGETS"] = net.JoinHostPort(ip, strconv.Itoa(int(coll.Spec.FlowlogsPipeline.Port))) + } + return envs, nil +} + +func checkUpdateEnv(name, value string, container *corev1.Container) bool { + for i, env := range container.Env { + if env.Name == name { + if env.Value == value { + return false + } + container.Env[i].Value = value + return true + } + } + container.Env = append(container.Env, corev1.EnvVar{ + Name: name, + Value: value, + }) + return true +}