diff --git a/pkg/frr/frr.go b/pkg/frr/frr.go index 70fe534d..0f1f90b0 100644 --- a/pkg/frr/frr.go +++ b/pkg/frr/frr.go @@ -14,6 +14,7 @@ import ( "net" "os" "os/exec" + "os/user" "path" "reflect" "strconv" @@ -23,6 +24,7 @@ import ( "github.com/opiproject/opi-evpn-bridge/pkg/config" "github.com/opiproject/opi-evpn-bridge/pkg/infradb" "github.com/opiproject/opi-evpn-bridge/pkg/infradb/common" + "github.com/opiproject/opi-evpn-bridge/pkg/infradb/subscriberframework/actionbus" "github.com/opiproject/opi-evpn-bridge/pkg/infradb/subscriberframework/eventbus" "github.com/opiproject/opi-evpn-bridge/pkg/utils" ) @@ -30,9 +32,40 @@ import ( // frrComp string constant const frrComp string = "frr" +// replayThreshold time threshold for replay +const replayThreshold = 64 * time.Second + // ModulefrrHandler empty structure type ModulefrrHandler struct{} +// ModuleFrrActionHandler empty structure +type ModuleFrrActionHandler struct { + // runningFrrConfFile holds the running configuration of FRR daemon + runningFrrConfFile string + // basicFrrConfFile holds the basic/initial configuration of FRR daemon + basicFrrConfFile string + // backupFrrConfFile holds the backup configuration the current running config of FRR daemon + backupFrrConfFile string +} + +// NewModuleFrrActionHandler initializes a default ModuleFrrActionHandler +func NewModuleFrrActionHandler() *ModuleFrrActionHandler { + return &ModuleFrrActionHandler{ + runningFrrConfFile: "/etc/frr/frr.conf", + basicFrrConfFile: "/etc/frr/frr-basic.conf", + backupFrrConfFile: "/etc/frr/frr.conf.bak", + } +} + +// NewModuleFrrActionHandlerWithArgs initializes a ModuleFrrActionHandler +func NewModuleFrrActionHandlerWithArgs(runningFrrConfFile, basicFrrConfFile, backupFrrConfFile string) *ModuleFrrActionHandler { + return &ModuleFrrActionHandler{ + runningFrrConfFile: runningFrrConfFile, + basicFrrConfFile: basicFrrConfFile, + backupFrrConfFile: backupFrrConfFile, + } +} + // HandleEvent handles the events func (h *ModulefrrHandler) HandleEvent(eventType string, objectData *eventbus.ObjectData) { switch eventType { @@ -47,6 +80,82 @@ func (h *ModulefrrHandler) HandleEvent(eventType string, objectData *eventbus.Ob } } +// HandleAction handles the actions +func (h *ModuleFrrActionHandler) HandleAction(actionType string, actionData *actionbus.ActionData) { + switch actionType { + case "preReplay": + log.Printf("Module FRR received %s\n", actionType) + h.handlePreReplay(actionData) + default: + log.Printf("error: Unknown action type %s", actionType) + } +} + +func (h *ModuleFrrActionHandler) handlePreReplay(actionData *actionbus.ActionData) { + var deferErr error + + defer func() { + // The ErrCh is used in order to notify the sender that the preReplay step has + // been executed successfully. + actionData.ErrCh <- deferErr + }() + + // Backup the current running config + deferErr = os.Rename(h.runningFrrConfFile, h.backupFrrConfFile) + if deferErr != nil { + log.Printf("FRR: handlePreReplay(): Failed to backup running config of FRR: %s\n", deferErr) + return + } + + // Create a new running config based on the basic/initial FRR config + input, deferErr := os.ReadFile(h.basicFrrConfFile) + if deferErr != nil { + log.Printf("FRR: handlePreReplay(): Failed to read content of %s: %s\n", h.basicFrrConfFile, deferErr) + return + } + + deferErr = os.WriteFile(h.runningFrrConfFile, input, 0600) + if deferErr != nil { + log.Printf("FRR: handlePreReplay(): Failed to write content to %s: %s\n", h.runningFrrConfFile, deferErr) + return + } + + // Change ownership of the frr.conf to frr:frr + group, deferErr := user.Lookup("frr") + if deferErr != nil { + log.Printf("FRR: handlePreReplay(): Failed to lookup user frr %s\n", deferErr) + return + } + + uid, deferErr := strconv.Atoi(group.Uid) + if deferErr != nil { + log.Printf("FRR: handlePreReplay(): Failed to convert frr user string in linux to int %s\n", deferErr) + return + } + + gid, deferErr := strconv.Atoi(group.Gid) + if deferErr != nil { + log.Printf("FRR: handlePreReplay(): Failed to convert frr group string in linux to int %s\n", deferErr) + return + } + + deferErr = os.Chown(h.runningFrrConfFile, uid, gid) + if deferErr != nil { + log.Printf("FRR: handlePreReplay(): Failed to chown of %s to frr:frr : %s\n", h.runningFrrConfFile, deferErr) + return + } + + // Restart FRR daemon + _, errCmd := utils.Run([]string{"systemctl", "restart", "frr"}, false) + if errCmd != 0 { + log.Println("FRR: handlePreReplay(): Failed to restart FRR daemon") + deferErr = fmt.Errorf("restart FRR daemon failed") + return + } + + log.Println("FRR: handlePreReplay(): The pre-replay procedure has executed successfully") +} + // handlesvi handles the svi functionality // //nolint:funlen,gocognit @@ -106,6 +215,10 @@ func handlesvi(objectData *eventbus.ObjectData) { comp.CompStatus = common.ComponentStatusError } log.Printf("%+v\n", comp) + + // Checking the timer to decide if we need to replay or not + comp.Replay = utils.CheckReplayThreshold(comp.Timer, replayThreshold) + err := infradb.UpdateSviStatus(objectData.Name, objectData.ResourceVersion, objectData.NotificationID, nil, comp) if err != nil { log.Printf("error in updating svi status: %s\n", err) @@ -125,6 +238,10 @@ func handlesvi(objectData *eventbus.ObjectData) { comp.CompStatus = common.ComponentStatusError } log.Printf("%+v\n", comp) + + // Checking the timer to decide if we need to replay or not + comp.Replay = utils.CheckReplayThreshold(comp.Timer, replayThreshold) + err := infradb.UpdateSviStatus(objectData.Name, objectData.ResourceVersion, objectData.NotificationID, nil, comp) if err != nil { log.Printf("error in updating svi status: %s\n", err) @@ -199,6 +316,10 @@ func handlevrf(objectData *eventbus.ObjectData) { comp.CompStatus = common.ComponentStatusError } log.Printf("%+v\n", comp) + + // Checking the timer to decide if we need to replay or not + comp.Replay = utils.CheckReplayThreshold(comp.Timer, replayThreshold) + err := infradb.UpdateVrfStatus(objectData.Name, objectData.ResourceVersion, objectData.NotificationID, nil, comp) if err != nil { log.Printf("error in updating vrf status: %s\n", err) @@ -218,6 +339,10 @@ func handlevrf(objectData *eventbus.ObjectData) { comp.CompStatus = common.ComponentStatusError } log.Printf("%+v\n", comp) + + // Checking the timer to decide if we need to replay or not + comp.Replay = utils.CheckReplayThreshold(comp.Timer, replayThreshold) + err := infradb.UpdateVrfStatus(objectData.Name, objectData.ResourceVersion, objectData.NotificationID, nil, comp) if err != nil { log.Printf("error in updating vrf status: %s\n", err) @@ -252,6 +377,7 @@ var localas int // subscribeInfradb function handles the infradb subscriptions func subscribeInfradb(config *config.Config) { eb := eventbus.EBus + ab := actionbus.ABus for _, subscriberConfig := range config.Subscribers { if subscriberConfig.Name == frrComp { for _, eventType := range subscriberConfig.Events { @@ -259,6 +385,7 @@ func subscribeInfradb(config *config.Config) { } } } + ab.StartSubscriber(frrComp, "preReplay", NewModuleFrrActionHandler()) } // ctx variable of type context diff --git a/pkg/infradb/bridge.go b/pkg/infradb/bridge.go index 1a3d7e44..5da14d8c 100644 --- a/pkg/infradb/bridge.go +++ b/pkg/infradb/bridge.go @@ -52,6 +52,7 @@ type LogicalBridgeMetadata struct{} // LogicalBridge holds Logical Bridge info type LogicalBridge struct { + Domain Name string Spec *LogicalBridgeSpec Status *LogicalBridgeStatus @@ -211,3 +212,76 @@ func (in *LogicalBridge) DeleteBridgePort(bpName, bpMac string) error { func (in *LogicalBridge) GetName() string { return in.Name } + +// setComponentState set the stat of the component +func (in *LogicalBridge) setComponentState(component common.Component) { + lbComponents := in.Status.Components + for i, comp := range lbComponents { + if comp.Name == component.Name { + in.Status.Components[i] = component + break + } + } +} + +// checkForAllSuccess check if all the components are in Success state +func (in *LogicalBridge) checkForAllSuccess() bool { + for _, comp := range in.Status.Components { + if comp.CompStatus != common.ComponentStatusSuccess { + return false + } + } + return true +} + +// parseMeta parse metadata +func (in *LogicalBridge) parseMeta(lbMeta *LogicalBridgeMetadata) { + if lbMeta != nil { + in.Metadata = lbMeta + } +} + +func (in *LogicalBridge) getStatusComponents() []common.Component { + return in.Status.Components +} + +func (in *LogicalBridge) setStatusComponents(components []common.Component) { + copy(in.Status.Components, components) +} + +func (in *LogicalBridge) isOperationalStatus(operStatus OperStatus) bool { + switch operStatus { + case OperStatusUp: + return in.Status.LBOperStatus == LogicalBridgeOperStatusUp + case OperStatusDown: + return in.Status.LBOperStatus == LogicalBridgeOperStatusDown + case OperStatusToBeDeleted: + return in.Status.LBOperStatus == LogicalBridgeOperStatusToBeDeleted + case OperStatusUnspecified: + return in.Status.LBOperStatus == LogicalBridgeOperStatusUnspecified + default: + log.Println("isOperationalStatus(): operational status has not been identified") + return false + } +} + +func (in *LogicalBridge) setOperationalStatus(operStatus OperStatus) { + switch operStatus { + case OperStatusUp: + in.Status.LBOperStatus = LogicalBridgeOperStatusUp + case OperStatusDown: + in.Status.LBOperStatus = LogicalBridgeOperStatusDown + case OperStatusToBeDeleted: + in.Status.LBOperStatus = LogicalBridgeOperStatusToBeDeleted + case OperStatusUnspecified: + in.Status.LBOperStatus = LogicalBridgeOperStatusUnspecified + default: + log.Println("setOperationalStatus(): operational status has not been identified") + } +} + +// TODO: This function can probably be moved to the domain.go as the ResourceVersion +// field is common for all the child objects (VRF,LB, BP, SVI) +func (in *LogicalBridge) setNewResourceVersion() { + in.ResourceVersion = generateVersion() +} diff --git a/pkg/infradb/common/common.go b/pkg/infradb/common/common.go index 157a5709..b6af2b27 100644 --- a/pkg/infradb/common/common.go +++ b/pkg/infradb/common/common.go @@ -34,6 +34,8 @@ type Component struct { // Free format json string Details string Timer time.Duration + // Replay is used when the module wants to trigger replay action + Replay bool } func ip4ToInt(ip net.IP) uint32 { diff --git a/pkg/infradb/domain.go b/pkg/infradb/domain.go new file mode 100644 index 00000000..ce658338 --- /dev/null +++ b/pkg/infradb/domain.go @@ -0,0 +1,66 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright (c) 2023-2024 Intel Corporation, or its subsidiaries. +// Copyright (c) 2024 Ericsson AB. + +package infradb + +import ( + "github.com/opiproject/opi-evpn-bridge/pkg/infradb/common" + "github.com/opiproject/opi-evpn-bridge/pkg/infradb/subscriberframework/eventbus" +) + +// IDomain holds the utilities for child objects +type IDomain interface { + getStatusComponents() []common.Component + setStatusComponents([]common.Component) + isOperationalStatus(OperStatus) bool + setOperationalStatus(OperStatus) + setNewResourceVersion() +} + +// OperStatus operational Status +type OperStatus int32 + +const ( + // OperStatusUnspecified for "unknown" state + OperStatusUnspecified OperStatus = iota + // OperStatusUp for "up" state + OperStatusUp = iota + // OperStatusDown for "down" state + OperStatusDown = iota + // OperStatusToBeDeleted for "to be deleted" state + OperStatusToBeDeleted = iota +) + +// Domain holds domain info +type Domain struct { + IDomain IDomain +} + +// prepareObjectsForReplay prepares an object for replay by setting the unsuccessful components +// in pending state and returning a list of the components that need to be contacted for the +// replay of the particular object that called the function. +func (in *Domain) prepareObjectsForReplay(componentName string, vrfSubs []*eventbus.Subscriber) []*eventbus.Subscriber { + // We assume that the list of Components that are returned + // from DB is ordered based on the priority as that was the + // way that has been stored in the DB in first place. + + vrfComponents := in.IDomain.getStatusComponents() + auxComponents := in.IDomain.getStatusComponents() + tempSubs := []*eventbus.Subscriber{} + for i, comp := range vrfComponents { + if comp.Name == componentName || comp.CompStatus != common.ComponentStatusSuccess { + auxComponents[i] = common.Component{Name: comp.Name, CompStatus: common.ComponentStatusPending, Details: ""} + tempSubs = append(tempSubs, vrfSubs[i]) + } + } + + in.IDomain.setStatusComponents(auxComponents) + + if in.IDomain.isOperationalStatus(OperStatusUp) { + in.IDomain.setOperationalStatus(OperStatusDown) + } + + in.IDomain.setNewResourceVersion() + return tempSubs +} diff --git a/pkg/infradb/infradb.go b/pkg/infradb/infradb.go index 664db24d..7eed4da4 100644 --- a/pkg/infradb/infradb.go +++ b/pkg/infradb/infradb.go @@ -286,7 +286,7 @@ func UpdateLBStatus(name string, resourceVersion string, notificationID string, globalLock.Lock() defer globalLock.Unlock() - var lastCompSuccsess bool + var allCompSuccess bool // When we get an error from an operation to the Database then we just return it. The // Task manager will just expire the task and retry. @@ -311,27 +311,26 @@ func UpdateLBStatus(name string, resourceVersion string, notificationID string, return nil } - lbComponents := lb.Status.Components - for i, comp := range lbComponents { - compCounter := i + 1 - if comp.Name == component.Name { - lb.Status.Components[i] = component + if component.Replay { + // One of the components has requested a replay of the DB. + // The task related to the status update will be dropped. + log.Printf("UpdateLBStatus(): Component %s has requested a replay\n", component.Name) + taskmanager.TaskMan.StatusUpdated(lb.Name, "logical-bridge", lb.ResourceVersion, notificationID, true, &component) + go startReplayProcedure(component.Name) + return nil + } - if compCounter == len(lbComponents) && lb.Status.Components[i].CompStatus == common.ComponentStatusSuccess { - lastCompSuccsess = true - } + // Set the state of the component + lb.setComponentState(component) - break - } - } + // Check if all the components are in Success state + allCompSuccess = lb.checkForAllSuccess() // Parse the Metadata that has been sent from the Component - if lbMeta != nil { - lb.Metadata = lbMeta - } + lb.parseMeta(lbMeta) // Is it ok to delete an object before we update the last component status to success ? - if lastCompSuccsess { + if allCompSuccess { if lb.Status.LBOperStatus == LogicalBridgeOperStatusToBeDeleted { err = infradb.client.Delete(lb.Name) if err != nil { @@ -606,7 +605,7 @@ func UpdateBPStatus(name string, resourceVersion string, notificationID string, globalLock.Lock() defer globalLock.Unlock() - var lastCompSuccsess bool + var allCompSuccess bool // When we get an error from an operation to the Database then we just return it. The // Task manager will just expire the task and retry. @@ -631,30 +630,27 @@ func UpdateBPStatus(name string, resourceVersion string, notificationID string, return nil } - bpComponents := bp.Status.Components - for i, comp := range bpComponents { - compCounter := i + 1 - if comp.Name == component.Name { - bp.Status.Components[i] = component + if component.Replay { + // One of the components has requested a replay of the DB. + // The task related to the status update will be dropped. + log.Printf("UpdateBPStatus(): Component %s has requested a replay\n", component.Name) + taskmanager.TaskMan.StatusUpdated(bp.Name, "bridge-port", bp.ResourceVersion, notificationID, true, &component) + go startReplayProcedure(component.Name) + return nil + } - if compCounter == len(bpComponents) && bp.Status.Components[i].CompStatus == common.ComponentStatusSuccess { - lastCompSuccsess = true - } + // Set the state of the component + bp.setComponentState(component) - break - } - } + // Check if all the components are in Success state + allCompSuccess = bp.checkForAllSuccess() // Parse the Metadata that has been sent from the Component - if bpMeta != nil { - if bpMeta.VPort != "" { - bp.Metadata.VPort = bpMeta.VPort - } - } + bp.parseMeta(bpMeta) // Is it ok to delete an object before we update the last component status to success ? // Take care of deleting the references to the LB objects after the BP has been successfully deleted - if lastCompSuccsess { + if allCompSuccess { if bp.Status.BPOperStatus == SviOperStatusToBeDeleted { // Delete the references from Logical Bridge objects for _, lbName := range bp.Spec.LogicalBridges { @@ -926,7 +922,7 @@ func UpdateVrfStatus(name string, resourceVersion string, notificationID string, globalLock.Lock() defer globalLock.Unlock() - var lastCompSuccsess bool + var allCompSuccess bool // When we get an error from an operation to the Database then we just return it. The // Task manager will just expire the task and retry. @@ -951,29 +947,27 @@ func UpdateVrfStatus(name string, resourceVersion string, notificationID string, return nil } - vrfComponents := vrf.Status.Components - for i, comp := range vrfComponents { - compCounter := i + 1 - if comp.Name == component.Name { - vrf.Status.Components[i] = component + // Here we check if the component has asked for a replay of the DB to be taken place + if component.Replay { + // One of the components has requested a replay of the DB. + // The task related to the status update will be dropped. + log.Printf("UpdateVrfStatus(): Component %s has requested a replay\n", component.Name) + taskmanager.TaskMan.StatusUpdated(vrf.Name, "vrf", vrf.ResourceVersion, notificationID, true, &component) + go startReplayProcedure(component.Name) + return nil + } - if compCounter == len(vrfComponents) && vrf.Status.Components[i].CompStatus == common.ComponentStatusSuccess { - lastCompSuccsess = true - } + // Set the state of the component + vrf.setComponentState(component) - break - } - } + // Check if all the components are in Success state + allCompSuccess = vrf.checkForAllSuccess() // Parse the Metadata that has been sent from the Component - if vrfMeta != nil { - if len(vrfMeta.RoutingTable) > 0 { - vrf.Metadata.RoutingTable = vrfMeta.RoutingTable - } - } + vrf.parseMeta(vrfMeta) // Is it ok to delete an object before we update the last component status to success ? - if lastCompSuccsess { + if allCompSuccess { if vrf.Status.VrfOperStatus == VrfOperStatusToBeDeleted { err = infradb.client.Delete(vrf.Name) if err != nil { @@ -1315,7 +1309,7 @@ func UpdateSviStatus(name string, resourceVersion string, notificationID string, globalLock.Lock() defer globalLock.Unlock() - var lastCompSuccsess bool + var allCompSuccess bool // When we get an error from an operation to the Database then we just return it. The // Task manager will just expire the task and retry. @@ -1340,28 +1334,27 @@ func UpdateSviStatus(name string, resourceVersion string, notificationID string, return nil } - sviComponents := svi.Status.Components - for i, comp := range sviComponents { - compCounter := i + 1 - if comp.Name == component.Name { - svi.Status.Components[i] = component + if component.Replay { + // One of the components has requested a replay of the DB. + // The task related to the status update will be dropped. + log.Printf("UpdateSviStatus(): Component %s has requested a replay\n", component.Name) + taskmanager.TaskMan.StatusUpdated(svi.Name, "svi", svi.ResourceVersion, notificationID, true, &component) + go startReplayProcedure(component.Name) + return nil + } - if compCounter == len(sviComponents) && svi.Status.Components[i].CompStatus == common.ComponentStatusSuccess { - lastCompSuccsess = true - } + // Set the state of the component + svi.setComponentState(component) - break - } - } + // Check if all the components are in Success state + allCompSuccess = svi.checkForAllSuccess() // Parse the Metadata that has been sent from the Component - if sviMeta != nil { - svi.Metadata = sviMeta - } + svi.parseMeta(sviMeta) // Is it ok to delete an object before we update the last component status to success ? // Take care of deleting the references to the LB and VRF objects after the SVI has been successfully deleted - if lastCompSuccsess { + if allCompSuccess { if svi.Status.SviOperStatus == SviOperStatusToBeDeleted { // Delete the references from VRF and Logical Bridge objects diff --git a/pkg/infradb/port.go b/pkg/infradb/port.go index dfc3e6a9..ff1f9779 100644 --- a/pkg/infradb/port.go +++ b/pkg/infradb/port.go @@ -66,6 +66,7 @@ type BridgePortMetadata struct { // BridgePort holds Bridge Port info type BridgePort struct { + Domain Name string Spec *BridgePortSpec Status *BridgePortStatus @@ -184,3 +185,78 @@ func (in *BridgePort) ToPb() *pb.BridgePort { func (in *BridgePort) GetName() string { return in.Name } + +// setComponentState set the stat of the component +func (in *BridgePort) setComponentState(component common.Component) { + bpComponents := in.Status.Components + for i, comp := range bpComponents { + if comp.Name == component.Name { + in.Status.Components[i] = component + break + } + } +} + +// checkForAllSuccess check if all the components are in Success state +func (in *BridgePort) checkForAllSuccess() bool { + for _, comp := range in.Status.Components { + if comp.CompStatus != common.ComponentStatusSuccess { + return false + } + } + return true +} + +// parseMeta parse metadata +func (in *BridgePort) parseMeta(bpMeta *BridgePortMetadata) { + if bpMeta != nil { + if bpMeta.VPort != "" { + in.Metadata.VPort = bpMeta.VPort + } + } +} + +func (in *BridgePort) getStatusComponents() []common.Component { + return in.Status.Components +} + +func (in *BridgePort) setStatusComponents(components []common.Component) { + copy(in.Status.Components, components) +} + +func (in *BridgePort) isOperationalStatus(operStatus OperStatus) bool { + switch operStatus { + case OperStatusUp: + return in.Status.BPOperStatus == BridgePortOperStatusUp + case OperStatusDown: + return in.Status.BPOperStatus == BridgePortOperStatusDown + case OperStatusToBeDeleted: + return in.Status.BPOperStatus == BridgePortOperStatusToBeDeleted + case OperStatusUnspecified: + return in.Status.BPOperStatus == BridgePortOperStatusUnspecified + default: + log.Println("isOperationalStatus(): operational status has not been identified") + return false + } +} + +func (in *BridgePort) setOperationalStatus(operStatus OperStatus) { + switch operStatus { + case OperStatusUp: + in.Status.BPOperStatus = BridgePortOperStatusUp + case OperStatusDown: + in.Status.BPOperStatus = BridgePortOperStatusDown + case OperStatusToBeDeleted: + in.Status.BPOperStatus = BridgePortOperStatusToBeDeleted + case OperStatusUnspecified: + in.Status.BPOperStatus = BridgePortOperStatusUnspecified + default: + log.Println("setOperationalStatus(): operational status has not been identified") + } +} + +// TODO: This function can probably be moved to the domain.go as the ResourceVersion +// field is common for all the child objects (VRF,LB, BP, SVI) +func (in *BridgePort) setNewResourceVersion() { + in.ResourceVersion = generateVersion() +} diff --git a/pkg/infradb/replay.go b/pkg/infradb/replay.go new file mode 100644 index 00000000..d4a418da --- /dev/null +++ b/pkg/infradb/replay.go @@ -0,0 +1,278 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright (c) 2023-2024 Intel Corporation, or its subsidiaries. +// Copyright (c) 2024 Ericsson AB + +// Package infradb exposes the interface for the manipulation of the api objects +package infradb + +import ( + "fmt" + "log" + + "github.com/opiproject/opi-evpn-bridge/pkg/infradb/subscriberframework/actionbus" + "github.com/opiproject/opi-evpn-bridge/pkg/infradb/subscriberframework/eventbus" + "github.com/opiproject/opi-evpn-bridge/pkg/infradb/taskmanager" +) + +func startReplayProcedure(componentName string) { + globalLock.Lock() + + var deferErr error + var preSubscriber *actionbus.Subscriber + var subsForReplay [][]*eventbus.Subscriber + var objectsToReplay []interface{} + + defer func() { + globalLock.Unlock() + log.Println("startReplayProcedure(): unblocking the TaskManager to continue") + taskmanager.TaskMan.ReplayFinished() + if deferErr != nil { + log.Println("startReplayProcedure(): The replay procedure has failed") + return + } + createReplayTasks(objectsToReplay, subsForReplay) + }() + + preSubscribers := actionbus.ABus.GetSubscribers("preReplay") + + for _, preSub := range preSubscribers { + if preSub.Name == componentName { + preSubscriber = preSub + break + } + } + + if preSubscriber == nil { + deferErr = fmt.Errorf("no pre-replay subscriber for %s", componentName) + log.Printf("startReplayProcedure(): Error %+v\n", deferErr) + return + } + + // Notify the preReplay subscriber + actionData := actionbus.NewActionData() + deferErr = actionbus.ABus.Publish(actionData, preSubscriber) + if deferErr != nil { + log.Printf("startReplayProcedure(): Error %+v\n", deferErr) + return + } + + // Waiting for the pre-replay procedure to finish + deferErr = <-actionData.ErrCh + close(actionData.ErrCh) + + if deferErr != nil { + log.Printf("startReplayProcedure(): Error %+v\n", deferErr) + return + } + + log.Printf("startReplayProcedure(): Component %s has successfully executed pre-replay steps", componentName) + + objectTypesToReplay := getObjectTypesToReplay(componentName) + + objectsToReplay, subsForReplay, deferErr = gatherObjectsAndSubsToReplay(componentName, objectTypesToReplay) + if deferErr != nil { + log.Printf("startReplayProcedure(): Error %+v\n", deferErr) + return + } +} + +// getObjectTypesToReplay collects all the types of object to be replayed +// which are related to the component that called the replay. +func getObjectTypesToReplay(componentName string) []string { + objectTypesToReplay := []string{} + typesAndSubs := make(map[string][]*eventbus.Subscriber) + + typesAndSubs["bridge-port"] = eventbus.EBus.GetSubscribers("bridge-port") + typesAndSubs["svi"] = eventbus.EBus.GetSubscribers("svi") + typesAndSubs["logical-bridge"] = eventbus.EBus.GetSubscribers("logical-bridge") + typesAndSubs["vrf"] = eventbus.EBus.GetSubscribers("vrf") + + for objType, subs := range typesAndSubs { + for _, sub := range subs { + if sub.Name == componentName { + objectTypesToReplay = append(objectTypesToReplay, objType) + break + } + } + } + + return objectTypesToReplay +} + +// nolint: funlen, gocognit +func gatherObjectsAndSubsToReplay(componentName string, objectTypesToReplay []string) ([]interface{}, [][]*eventbus.Subscriber, error) { + objectsToReplay := []interface{}{} + subsForReplay := [][]*eventbus.Subscriber{} + + bpSubs := eventbus.EBus.GetSubscribers("bridge-port") + sviSubs := eventbus.EBus.GetSubscribers("svi") + lbSubs := eventbus.EBus.GetSubscribers("logical-bridge") + vrfSubs := eventbus.EBus.GetSubscribers("vrf") + + domain := &Domain{} + + for _, objType := range objectTypesToReplay { + switch objType { + case "vrf": + vrfsMap := make(map[string]bool) + found, err := infradb.client.Get("vrfs", &vrfsMap) + + if err != nil { + return nil, nil, err + } + + if !found { + log.Println("gatherObjectsAndSubsToReplay(): No VRFs have been found") + continue + } + + for key := range vrfsMap { + vrf := &Vrf{} + found, err := infradb.client.Get(key, vrf) + if err != nil { + return nil, nil, err + } + + // Dimitris: Do we need to just continue here or throw error and stop ? + if !found { + return nil, nil, ErrKeyNotFound + } + + // tempSubs holds the subscribers list to be contacted for every VRF object each time + // for replay + domain.IDomain = vrf + tempSubs := domain.prepareObjectsForReplay(componentName, vrfSubs) + + err = infradb.client.Set(vrf.Name, vrf) + if err != nil { + return nil, nil, err + } + + subsForReplay = append(subsForReplay, tempSubs) + objectsToReplay = append(objectsToReplay, vrf) + } + case "logical-bridge": + lbsMap := make(map[string]bool) + found, err := infradb.client.Get("lbs", &lbsMap) + if err != nil { + return nil, nil, err + } + + if !found { + log.Println("gatherObjectsAndSubsToReplay(): No Logical Bridges have been found") + continue + } + for key := range lbsMap { + lb := &LogicalBridge{} + found, err := infradb.client.Get(key, lb) + if err != nil { + return nil, nil, err + } + + if !found { + return nil, nil, ErrKeyNotFound + } + + // tempSubs holds the subscribers list to be contacted for every VRF object each time + // for replay + domain.IDomain = lb + tempSubs := domain.prepareObjectsForReplay(componentName, lbSubs) + + err = infradb.client.Set(lb.Name, lb) + if err != nil { + return nil, nil, err + } + subsForReplay = append(subsForReplay, tempSubs) + objectsToReplay = append(objectsToReplay, lb) + } + case "svi": + svisMap := make(map[string]bool) + found, err := infradb.client.Get("svis", &svisMap) + if err != nil { + return nil, nil, err + } + + if !found { + log.Println("gatherObjectsAndSubsToReplay(): No SVIs have been found") + continue + } + for key := range svisMap { + svi := &Svi{} + found, err := infradb.client.Get(key, svi) + if err != nil { + return nil, nil, err + } + + if !found { + return nil, nil, ErrKeyNotFound + } + + // tempSubs holds the subscribers list to be contacted for every VRF object each time + // for replay + domain.IDomain = svi + tempSubs := domain.prepareObjectsForReplay(componentName, sviSubs) + + err = infradb.client.Set(svi.Name, svi) + if err != nil { + return nil, nil, err + } + subsForReplay = append(subsForReplay, tempSubs) + objectsToReplay = append(objectsToReplay, svi) + } + case "bp": + bpsMap := make(map[string]bool) + found, err := infradb.client.Get("bps", &bpsMap) + if err != nil { + return nil, nil, err + } + + if !found { + log.Println("gatherObjectsAndSubsToReplay(): No Bridge Ports have been found") + continue + } + for key := range bpsMap { + bp := &BridgePort{} + found, err := infradb.client.Get(key, bp) + if err != nil { + return nil, nil, err + } + + if !found { + return nil, nil, ErrKeyNotFound + } + + // tempSubs holds the subscribers list to be contacted for every VRF object each time + // for replay + domain.IDomain = bp + tempSubs := domain.prepareObjectsForReplay(componentName, bpSubs) + + err = infradb.client.Set(bp.Name, bp) + if err != nil { + return nil, nil, err + } + subsForReplay = append(subsForReplay, tempSubs) + objectsToReplay = append(objectsToReplay, bp) + } + } + } + + return objectsToReplay, subsForReplay, nil +} + +// createReplayTasks create new tasks for the realization of the new replay objects intents +func createReplayTasks(objectsToReplay []interface{}, subsForReplay [][]*eventbus.Subscriber) { + for i, obj := range objectsToReplay { + switch tempObj := obj.(type) { + case *Vrf: + taskmanager.TaskMan.CreateTask(tempObj.Name, "vrf", tempObj.ResourceVersion, subsForReplay[i]) + case *LogicalBridge: + taskmanager.TaskMan.CreateTask(tempObj.Name, "logical-bridge", tempObj.ResourceVersion, subsForReplay[i]) + case *Svi: + taskmanager.TaskMan.CreateTask(tempObj.Name, "svi", tempObj.ResourceVersion, subsForReplay[i]) + case *BridgePort: + taskmanager.TaskMan.CreateTask(tempObj.Name, "bridge-port", tempObj.ResourceVersion, subsForReplay[i]) + default: + log.Printf("createReplayTasks: Unknown object type %+v\n", tempObj) + } + } +} diff --git a/pkg/infradb/subscriberframework/actionbus/actionbus.go b/pkg/infradb/subscriberframework/actionbus/actionbus.go new file mode 100644 index 00000000..a2695a51 --- /dev/null +++ b/pkg/infradb/subscriberframework/actionbus/actionbus.go @@ -0,0 +1,139 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright (c) 2023-2024 Intel Corporation, or its subsidiaries. +// Copyright (c) 2024 Ericsson AB + +// Package actionbus holds implementation for subscribing and receiving actions +package actionbus + +import ( + "fmt" + "log" + "sync" + + "github.com/opiproject/opi-evpn-bridge/pkg/utils" +) + +// ABus holds the ActionBus object +var ABus = NewActionBus() + +// ActionBus holds the action bus info +type ActionBus struct { + subscribers map[string][]*Subscriber + actionHandlers map[string]ActionHandler + subscriberL sync.RWMutex +} + +// Subscriber holds the info for each subscriber +type Subscriber struct { + Name string + Ch chan interface{} + Quit chan bool +} + +// ActionHandler handles the action requests that arrive +type ActionHandler interface { + HandleAction(string, *ActionData) +} + +// ActionData holds the data for each action +type ActionData struct { + ErrCh chan error +} + +// StartSubscriber will be called by the modules to initialize and start listening for actions +func (a *ActionBus) StartSubscriber(moduleName, actionType string, actionHandler ActionHandler) { + if !a.subscriberExist(actionType, moduleName) { + subscriber := a.Subscribe(moduleName, actionType, actionHandler) + + go func() { + for { + select { + case action := <-subscriber.Ch: + log.Printf("\nSubscriber %s for %s received \n", moduleName, actionType) + + handlerKey := utils.ComposeHandlerName(moduleName, actionType) + if handler, ok := a.actionHandlers[handlerKey]; ok { + if actionData, ok := action.(*ActionData); ok { + handler.HandleAction(actionType, actionData) + } else { + log.Println("error: unexpected action type") + } + } else { + log.Println("error: no action handler found") + } + case <-subscriber.Quit: + close(subscriber.Ch) + return + } + } + }() + } +} + +// NewActionBus initializes an ActionBus object +func NewActionBus() *ActionBus { + return &ActionBus{ + subscribers: make(map[string][]*Subscriber), + actionHandlers: make(map[string]ActionHandler), + } +} + +// NewActionData initializes an ActionData object +func NewActionData() *ActionData { + return &ActionData{ + ErrCh: make(chan error), + } +} + +// Subscribe api provides registration of a subscriber to the given action +func (a *ActionBus) Subscribe(moduleName, actionType string, actionHandler ActionHandler) *Subscriber { + a.subscriberL.Lock() + defer a.subscriberL.Unlock() + + subscriber := &Subscriber{ + Name: moduleName, + Ch: make(chan interface{}), + Quit: make(chan bool), + } + + a.subscribers[actionType] = append(a.subscribers[actionType], subscriber) + + handlerKey := utils.ComposeHandlerName(moduleName, actionType) + a.actionHandlers[handlerKey] = actionHandler + + log.Printf("Subscriber %s registered for action %s\n", moduleName, actionType) + return subscriber +} + +// GetSubscribers api is used to fetch the list of subscribers registered with given actionType +func (a *ActionBus) GetSubscribers(actionType string) []*Subscriber { + a.subscriberL.RLock() + defer a.subscriberL.RUnlock() + + return a.subscribers[actionType] +} + +// Publish api notifies the subscribers with certain actionType +func (a *ActionBus) Publish(actionData *ActionData, subscriber *Subscriber) error { + var err error + + select { + case subscriber.Ch <- actionData: + log.Printf("Publish(): Notification is sent to subscriber %s\n", subscriber.Name) + default: + err = fmt.Errorf("channel for subscriber %s is busy", subscriber.Name) + } + return err +} + +func (a *ActionBus) subscriberExist(actionType string, moduleName string) bool { + subList := a.GetSubscribers(actionType) + if len(subList) != 0 { + for _, s := range subList { + if s.Name == moduleName { + return true + } + } + } + return false +} diff --git a/pkg/infradb/subscriberframework/eventbus/eventbus.go b/pkg/infradb/subscriberframework/eventbus/eventbus.go index 47384474..68ad6ff4 100644 --- a/pkg/infradb/subscriberframework/eventbus/eventbus.go +++ b/pkg/infradb/subscriberframework/eventbus/eventbus.go @@ -9,6 +9,8 @@ import ( "log" "sort" "sync" + + "github.com/opiproject/opi-evpn-bridge/pkg/utils" ) // EBus holds the EventBus object @@ -54,7 +56,7 @@ func (e *EventBus) StartSubscriber(moduleName, eventType string, priority int, e case event := <-subscriber.Ch: log.Printf("\nSubscriber %s for %s received \n", moduleName, eventType) - handlerKey := moduleName + "." + eventType + handlerKey := utils.ComposeHandlerName(moduleName, eventType) if handler, ok := e.eventHandlers[handlerKey]; ok { if objectData, ok := event.(*ObjectData); ok { handler.HandleEvent(eventType, objectData) @@ -96,7 +98,9 @@ func (e *EventBus) Subscribe(moduleName, eventType string, priority int, eventHa } e.subscribers[eventType] = append(e.subscribers[eventType], subscriber) - e.eventHandlers[moduleName+"."+eventType] = eventHandler + + handlerKey := utils.ComposeHandlerName(moduleName, eventType) + e.eventHandlers[handlerKey] = eventHandler // Sort subscribers based on priority sort.Slice(e.subscribers[eventType], func(i, j int) bool { @@ -137,7 +141,9 @@ func (e *EventBus) UnsubscribeModule(moduleName string) bool { if sub.Name == moduleName { sub.Quit <- true e.subscribers[eventName] = append(subs[:i], subs[i+1:]...) - e.eventHandlers[moduleName+"."+eventName] = nil + + handlerKey := utils.ComposeHandlerName(moduleName, eventName) + e.eventHandlers[handlerKey] = nil log.Printf("\n Module %s is unsubscribed for event %s", sub.Name, eventName) } } diff --git a/pkg/infradb/svi.go b/pkg/infradb/svi.go index 354384ba..51472d1e 100644 --- a/pkg/infradb/svi.go +++ b/pkg/infradb/svi.go @@ -56,6 +56,7 @@ type SviMetadata struct { // Svi holds SVI info type Svi struct { + Domain Name string Spec *SviSpec Status *SviStatus @@ -168,3 +169,76 @@ func (in *Svi) ToPb() *pb.Svi { func (in *Svi) GetName() string { return in.Name } + +// setComponentState set the stat of the component +func (in *Svi) setComponentState(component common.Component) { + sviComponents := in.Status.Components + for i, comp := range sviComponents { + if comp.Name == component.Name { + in.Status.Components[i] = component + break + } + } +} + +// checkForAllSuccess check if all the components are in Success state +func (in *Svi) checkForAllSuccess() bool { + for _, comp := range in.Status.Components { + if comp.CompStatus != common.ComponentStatusSuccess { + return false + } + } + return true +} + +// parseMeta parse metadata +func (in *Svi) parseMeta(sviMeta *SviMetadata) { + if sviMeta != nil { + in.Metadata = sviMeta + } +} + +func (in *Svi) getStatusComponents() []common.Component { + return in.Status.Components +} + +func (in *Svi) setStatusComponents(components []common.Component) { + copy(in.Status.Components, components) +} + +func (in *Svi) isOperationalStatus(operStatus OperStatus) bool { + switch operStatus { + case OperStatusUp: + return in.Status.SviOperStatus == SviOperStatusUp + case OperStatusDown: + return in.Status.SviOperStatus == SviOperStatusDown + case OperStatusToBeDeleted: + return in.Status.SviOperStatus == SviOperStatusToBeDeleted + case OperStatusUnspecified: + return in.Status.SviOperStatus == SviOperStatusUnspecified + default: + log.Println("isOperationalStatus(): operational status has not been identified") + return false + } +} + +func (in *Svi) setOperationalStatus(operStatus OperStatus) { + switch operStatus { + case OperStatusUp: + in.Status.SviOperStatus = SviOperStatusUp + case OperStatusDown: + in.Status.SviOperStatus = SviOperStatusDown + case OperStatusToBeDeleted: + in.Status.SviOperStatus = SviOperStatusToBeDeleted + case OperStatusUnspecified: + in.Status.SviOperStatus = SviOperStatusUnspecified + default: + log.Println("setOperationalStatus(): operational status has not been identified") + } +} + +// TODO: This function can probably be moved to the domain.go as the ResourceVersion +// field is common for all the child objects (VRF,LB, BP, SVI) +func (in *Svi) setNewResourceVersion() { + in.ResourceVersion = generateVersion() +} diff --git a/pkg/infradb/taskmanager/taskmanager.go b/pkg/infradb/taskmanager/taskmanager.go index 3f32c633..9cbc9754 100644 --- a/pkg/infradb/taskmanager/taskmanager.go +++ b/pkg/infradb/taskmanager/taskmanager.go @@ -23,6 +23,7 @@ var TaskMan = newTaskManager() type TaskManager struct { taskQueue *TaskQueue taskStatusChan chan *TaskStatus + replayChan chan struct{} } // Task corresponds to an onject to be realized @@ -50,6 +51,7 @@ func newTaskManager() *TaskManager { return &TaskManager{ taskQueue: NewTaskQueue(), taskStatusChan: make(chan *TaskStatus), + replayChan: make(chan struct{}), } } @@ -103,6 +105,12 @@ func (t *TaskManager) StatusUpdated(name, objectType, resourceVersion, notificat log.Printf("StatusUpdated(): New Task Status has been created and sent to channel: %+v\n", taskStatus) } +// ReplayFinished notifies that the replay of objects has finished +func (t *TaskManager) ReplayFinished() { + close(t.replayChan) + log.Println("ReplayFinished(): Replay has finished.") +} + // processTasks processes the task func (t *TaskManager) processTasks() { var taskStatus *TaskStatus @@ -153,8 +161,9 @@ func (t *TaskManager) processTasks() { } // This check is needed in order to move to the next task if the status channel has timed out or we need to drop the task in case that - // the task of the object is referring to an old already updated object or the object is no longer in the database (has been deleted). - if taskStatus == nil || taskStatus.dropTask { + // the task of the object is referring to an old already updated object or the object is no longer in the database (has been deleted) + // or a replay procedure has been requested + if t.checkStatus(taskStatus) { log.Println("processTasks(): Move to the next Task in the queue") break loopTwo } @@ -176,3 +185,22 @@ func (t *TaskManager) processTasks() { } } } + +// checkStatus checks if the taskStatus is nill or if the current Task +// should be dropped or if a replay procedure has been requested +func (t *TaskManager) checkStatus(taskStatus *TaskStatus) bool { + if taskStatus == nil { + return true + } + + if taskStatus.dropTask { + if taskStatus.component.Replay { + log.Println("checkStatus(): Wait for the replay DB procedure to finish and move to the next Task in the queue") + <-t.replayChan + log.Println("checkStatus(): Replay has finished. Continuing processing tasks") + } + return true + } + + return false +} diff --git a/pkg/infradb/vrf.go b/pkg/infradb/vrf.go index 0d0665ea..1832c3ed 100644 --- a/pkg/infradb/vrf.go +++ b/pkg/infradb/vrf.go @@ -57,6 +57,7 @@ type VrfMetadata struct { // Vrf holds VRF info type Vrf struct { + Domain Name string Spec *VrfSpec Status *VrfStatus @@ -238,3 +239,78 @@ func (in *Vrf) DeleteSvi(sviName string) error { func (in *Vrf) GetName() string { return in.Name } + +// setComponentState set the stat of the component +func (in *Vrf) setComponentState(component common.Component) { + vrfComponents := in.Status.Components + for i, comp := range vrfComponents { + if comp.Name == component.Name { + in.Status.Components[i] = component + break + } + } +} + +// checkForAllSuccess check if all the components are in Success state +func (in *Vrf) checkForAllSuccess() bool { + for _, comp := range in.Status.Components { + if comp.CompStatus != common.ComponentStatusSuccess { + return false + } + } + return true +} + +// parseMeta parse metadata +func (in *Vrf) parseMeta(vrfMeta *VrfMetadata) { + if vrfMeta != nil { + if len(vrfMeta.RoutingTable) > 0 { + in.Metadata.RoutingTable = vrfMeta.RoutingTable + } + } +} + +func (in *Vrf) getStatusComponents() []common.Component { + return in.Status.Components +} + +func (in *Vrf) setStatusComponents(components []common.Component) { + copy(in.Status.Components, components) +} + +func (in *Vrf) isOperationalStatus(operStatus OperStatus) bool { + switch operStatus { + case OperStatusUp: + return in.Status.VrfOperStatus == VrfOperStatusUp + case OperStatusDown: + return in.Status.VrfOperStatus == VrfOperStatusDown + case OperStatusToBeDeleted: + return in.Status.VrfOperStatus == VrfOperStatusToBeDeleted + case OperStatusUnspecified: + return in.Status.VrfOperStatus == VrfOperStatusUnspecified + default: + log.Println("isOperationalStatus(): operational status has not been identified") + return false + } +} + +func (in *Vrf) setOperationalStatus(operStatus OperStatus) { + switch operStatus { + case OperStatusUp: + in.Status.VrfOperStatus = VrfOperStatusUp + case OperStatusDown: + in.Status.VrfOperStatus = VrfOperStatusDown + case OperStatusToBeDeleted: + in.Status.VrfOperStatus = VrfOperStatusToBeDeleted + case OperStatusUnspecified: + in.Status.VrfOperStatus = VrfOperStatusUnspecified + default: + log.Println("setOperationalStatus(): operational status has not been identified") + } +} + +// TODO: This function can probably be moved to the domain.go as the ResourceVersion +// field is common for all the child objects (VRF,LB, BP, SVI) +func (in *Vrf) setNewResourceVersion() { + in.ResourceVersion = generateVersion() +} diff --git a/pkg/utils/helpers.go b/pkg/utils/helpers.go index 246f19bf..a59feb33 100644 --- a/pkg/utils/helpers.go +++ b/pkg/utils/helpers.go @@ -8,7 +8,9 @@ import ( "context" "log" "net" + "os/exec" "regexp" + "time" "github.com/vishvananda/netlink" "go.einride.tech/aip/fieldmask" @@ -22,6 +24,18 @@ func ApplyMaskToStoredPbObject[T proto.Message](updateMask *fieldmaskpb.FieldMas fieldmask.Update(updateMask, dst, src) } +// Run function run the commands +func Run(cmd []string, _ bool) (string, int) { + var out []byte + var err error + out, err = exec.Command(cmd[0], cmd[1:]...).CombinedOutput() //nolint:gosec + if err != nil { + return "Error in running command", -1 + } + output := string(out) + return output, 0 +} + // ValidateMacAddress validates if a passing MAC address // has the right format func ValidateMacAddress(b []byte) error { @@ -64,3 +78,13 @@ func GetIPAddress(dev string) net.IPNet { } return *validIps[0].IPNet } + +// CheckReplayThreshold checks if the replay threshold has been exceeded +func CheckReplayThreshold(currentTimer, replayThreshold time.Duration) bool { + return (currentTimer > replayThreshold) +} + +// ComposeHandlerName this function concatenates the module name and type +func ComposeHandlerName(moduleName, kindOfType string) string { + return moduleName + "." + kindOfType +} diff --git a/pkg/utils/netlink.go b/pkg/utils/netlink.go index 34c397cf..86a98fb4 100644 --- a/pkg/utils/netlink.go +++ b/pkg/utils/netlink.go @@ -10,7 +10,6 @@ import ( "context" "fmt" "net" - "os/exec" "github.com/vishvananda/netlink" @@ -58,22 +57,6 @@ type Netlink interface { RouteLookup(context.Context, string, string) (string, error) } -// run function run the commands -func run(cmd []string, _ bool) (string, int) { - var out []byte - var err error - out, err = exec.Command(cmd[0], cmd[1:]...).CombinedOutput() //nolint:gosec - if err != nil { - /*if flag { - // panic(fmt.Sprintf("Command %s': exit code %s;", out, err.Error())) - } - // fmt.Printf("Command %s': exit code %s;\n", out, err)*/ - return "Error in running command", -1 - } - output := string(out) - return output, 0 -} - // NetlinkWrapper wrapper for netlink package type NetlinkWrapper struct { tracer trace.Tracer @@ -295,7 +278,7 @@ func (n *NetlinkWrapper) RouteAdd(ctx context.Context, route *netlink.Route) err // RouteFlushTable is a wrapper for netlink.RouteFlushTable func (n *NetlinkWrapper) RouteFlushTable(_ context.Context, routingTable string) error { - _, err := run([]string{"ip", "route", "flush", "table", routingTable}, false) + _, err := Run([]string{"ip", "route", "flush", "table", routingTable}, false) if err != 0 { return fmt.Errorf("lgm: Error in executing command ip route flush table %s", routingTable) } @@ -304,13 +287,13 @@ func (n *NetlinkWrapper) RouteFlushTable(_ context.Context, routingTable string) // RouteListIPTable is a wrapper for netlink.RouteListIPTable func (n *NetlinkWrapper) RouteListIPTable(_ context.Context, vtip string) bool { - _, err := run([]string{"ip", "route", "list", "exact", vtip, "table", "local"}, false) + _, err := Run([]string{"ip", "route", "list", "exact", vtip, "table", "local"}, false) return err == 0 } // BridgeFdbAdd is a wrapper for netlink.BridgeFdbAdd func (n *NetlinkWrapper) BridgeFdbAdd(_ context.Context, link string, macAddress string) error { - _, err := run([]string{"bridge", "fdb", "add", macAddress, "dev", link, "master", "static", "extern_learn"}, false) + _, err := Run([]string{"bridge", "fdb", "add", macAddress, "dev", link, "master", "static", "extern_learn"}, false) if err != 0 { return errors.New("failed to add fdb entry") } @@ -322,9 +305,9 @@ func (n *NetlinkWrapper) ReadNeigh(_ context.Context, link string) (string, erro var out string var err int if link == "" { - out, err = run([]string{"ip", "-j", "-d", "neighbor", "show"}, false) + out, err = Run([]string{"ip", "-j", "-d", "neighbor", "show"}, false) } else { - out, err = run([]string{"ip", "-j", "-d", "neighbor", "show", "vrf", link}, false) + out, err = Run([]string{"ip", "-j", "-d", "neighbor", "show", "vrf", link}, false) } if err != 0 { return "", errors.New("failed routelookup") @@ -334,7 +317,7 @@ func (n *NetlinkWrapper) ReadNeigh(_ context.Context, link string) (string, erro // ReadRoute is a wrapper for netlink.ReadRoute func (n *NetlinkWrapper) ReadRoute(_ context.Context, table string) (string, error) { - out, err := run([]string{"ip", "-j", "-d", "route", "show", "table", table}, false) + out, err := Run([]string{"ip", "-j", "-d", "route", "show", "table", table}, false) if err != 0 { return "", errors.New("failed to read route") } @@ -343,7 +326,7 @@ func (n *NetlinkWrapper) ReadRoute(_ context.Context, table string) (string, err // ReadFDB is a wrapper for netlink.ReadFDB func (n *NetlinkWrapper) ReadFDB(_ context.Context) (string, error) { - out, err := run([]string{"bridge", "-d", "-j", "fdb", "show", "br", "br-tenant", "dynamic"}, false) + out, err := Run([]string{"bridge", "-d", "-j", "fdb", "show", "br", "br-tenant", "dynamic"}, false) if err != 0 { return "", errors.New("failed to read fdb") } @@ -355,9 +338,9 @@ func (n *NetlinkWrapper) RouteLookup(_ context.Context, dst string, link string) var out string var err int if link == "" { - out, err = run([]string{"ip", "-j", "route", "get", dst, "fibmatch"}, false) + out, err = Run([]string{"ip", "-j", "route", "get", dst, "fibmatch"}, false) } else { - out, err = run([]string{"ip", "-j", "route", "get", dst, "vrf", link, "fibmatch"}, false) + out, err = Run([]string{"ip", "-j", "route", "get", dst, "vrf", link, "fibmatch"}, false) } if err != 0 { return "", errors.New("failed routelookup")