Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete secondaryNetwork OVS ports correctly after an Agent restart #6853

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,19 @@ func run(o *Options) error {
}
}

// Secondary network controller should be created before CNIServer.Run() to make sure no Pod CNI updates will be missed.
var secondaryNetworkController *secondarynetwork.Controller
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
secondaryNetworkController, err = secondarynetwork.NewController(
o.config.ClientConnection, o.config.KubeAPIServerOverride,
k8sClient, localPodInformer.Get(),
podUpdateChannel, ifaceStore,
&o.config.SecondaryNetwork, ovsdbConnection)
if err != nil {
return fmt.Errorf("failed to create secondary network controller: %w", err)
}
}

var traceflowController *traceflow.Controller
if features.DefaultFeatureGate.Enabled(features.Traceflow) {
traceflowController = traceflow.NewTraceflowController(
Expand Down Expand Up @@ -760,18 +773,6 @@ func run(o *Options) error {
go ipamController.Run(stopCh)
}

var secondaryNetworkController *secondarynetwork.Controller
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
secondaryNetworkController, err = secondarynetwork.NewController(
o.config.ClientConnection, o.config.KubeAPIServerOverride,
k8sClient, localPodInformer.Get(),
podUpdateChannel,
&o.config.SecondaryNetwork, ovsdbConnection)
if err != nil {
return fmt.Errorf("failed to create secondary network controller: %w", err)
}
}

var bgpController *bgp.Controller
if features.DefaultFeatureGate.Enabled(features.BGPPolicy) {
bgpPolicyInformer := crdInformerFactory.Crd().V1alpha1().BGPPolicies()
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/secondarynetwork/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/secondarynetwork/podwatch"
agentconfig "antrea.io/antrea/pkg/config/agent"
"antrea.io/antrea/pkg/ovs/ovsconfig"
Expand All @@ -47,6 +48,7 @@ func NewController(
k8sClient clientset.Interface,
podInformer cache.SharedIndexInformer,
podUpdateSubscriber channel.Subscriber,
pIfaceStore interfacestore.InterfaceStore,
secNetConfig *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB,
) (*Controller, error) {
ovsBridgeClient, err := createOVSBridge(secNetConfig.OVSBridges, ovsdb)
Expand All @@ -65,7 +67,7 @@ func NewController(
// k8s.v1.cni.cncf.io/networks Annotation defined.
podWatchController, err := podwatch.NewPodController(
k8sClient, netAttachDefClient, podInformer,
podUpdateSubscriber, ovsBridgeClient)
podUpdateSubscriber, pIfaceStore, ovsBridgeClient)
if err != nil {
return nil, err
}
Expand Down
92 changes: 92 additions & 0 deletions pkg/agent/secondarynetwork/podwatch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func NewPodController(
netAttachDefClient netdefclient.K8sCniCncfIoV1Interface,
podInformer cache.SharedIndexInformer,
podUpdateSubscriber channel.Subscriber,
pIfaceStore interfacestore.InterfaceStore,
ovsBridgeClient ovsconfig.OVSBridgeClient,
) (*PodController, error) {
ifaceStore := interfacestore.NewInterfaceStore()
Expand Down Expand Up @@ -133,6 +134,15 @@ func NewPodController(
},
resyncPeriod,
)

if err := pc.initializeSecondaryInterfaceStore(); err != nil {
return nil, fmt.Errorf("failed to initialize secondary interface store: %v", err)
}

if err := pc.reconcileSecondaryInterfaces(pIfaceStore); err != nil {
return nil, fmt.Errorf("failed to restore CNI cache and reconcile secondary interfaces: %v", err)
}

// podUpdateSubscriber can be nil with test code.
if podUpdateSubscriber != nil {
// Subscribe Pod CNI add/del events.
Expand Down Expand Up @@ -502,3 +512,85 @@ func checkForPodSecondaryNetworkAttachement(pod *corev1.Pod) (string, bool) {
return netObj, false
}
}

// initializeSecondaryInterfaceStore restores secondary interfaceStore when agent restarts.
func (pc *PodController) initializeSecondaryInterfaceStore() error {
if pc.ovsBridgeClient == nil {
return nil
}

ovsPorts, err := pc.ovsBridgeClient.GetPortList()
if err != nil {
return fmt.Errorf("failed to list OVS ports for the secondary bridge: %v", err)
}

ifaceList := make([]*interfacestore.InterfaceConfig, 0, len(ovsPorts))
for index := range ovsPorts {
port := &ovsPorts[index]
ovsPort := &interfacestore.OVSPortConfig{
PortUUID: port.UUID,
OFPort: port.OFPort,
}

interfaceType, ok := port.ExternalIDs[interfacestore.AntreaInterfaceTypeKey]
if !ok {
klog.InfoS("Interface type is not set for the secondary bridge", "interfaceName", port.Name)
continue
}

var intf *interfacestore.InterfaceConfig
switch interfaceType {
case interfacestore.AntreaContainer:
intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort)
default:
klog.InfoS("Unknown Antrea interface type for the secondary bridge", "type", interfaceType)
KMAnju-2021 marked this conversation as resolved.
Show resolved Hide resolved
}

if intf != nil {
ifaceList = append(ifaceList, intf)
}

}

pc.interfaceStore.Initialize(ifaceList)
KMAnju-2021 marked this conversation as resolved.
Show resolved Hide resolved
klog.InfoS("Successfully initialized the secondary bridge interface store")

return nil
}

// reconcileSecondaryInterfaces restores cniCache when agent restarts using primary interfaceStore.
func (pc *PodController) reconcileSecondaryInterfaces(pIfaceStore interfacestore.InterfaceStore) error {
if pIfaceStore == nil {
klog.InfoS("Primary interfaceStore is nil, skipping reconciliation for Secondary Network")
return nil
}

knownInterfaces := pIfaceStore.GetInterfacesByType(interfacestore.ContainerInterface)
for _, containerConfig := range knownInterfaces {
config := containerConfig.ContainerInterfaceConfig
podKey := podKeyGet(config.PodName, config.PodNamespace)
pc.cniCache.Store(podKey, &podCNIInfo{
containerID: config.ContainerID,
})
}

var staleInterfaces []*interfacestore.InterfaceConfig
// secondaryInterfaces is the list of interfaces currently in the secondary local cache.
secondaryInterfaces := pc.interfaceStore.GetInterfacesByType(interfacestore.ContainerInterface)
for _, containerConfig := range secondaryInterfaces {
_, exists := pIfaceStore.GetContainerInterface(containerConfig.ContainerID)
if !exists || containerConfig.OFPort == -1 {
// Deletes ports not in the CNI cache.
staleInterfaces = append(staleInterfaces, containerConfig)
}
}

// If there are any stale interfaces, pass them to removeInterfaces()
if len(staleInterfaces) > 0 {
if err := pc.removeInterfaces(staleInterfaces); err != nil {
klog.ErrorS(err, "Failed to remove stale secondary interfaces", "staleInterfaces", staleInterfaces)
}
}

return nil
}
146 changes: 145 additions & 1 deletion pkg/agent/secondarynetwork/podwatch/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"time"

current "github.com/containernetworking/cni/pkg/types/100"
"github.com/google/uuid"
netdefv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
netdefclientfake "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/fake"
"github.com/stretchr/testify/assert"
Expand All @@ -43,12 +44,15 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/workqueue"

"antrea.io/antrea/pkg/agent/cniserver"
"antrea.io/antrea/pkg/agent/cniserver/ipam"
cnitypes "antrea.io/antrea/pkg/agent/cniserver/types"
"antrea.io/antrea/pkg/agent/interfacestore"
podwatchtesting "antrea.io/antrea/pkg/agent/secondarynetwork/podwatch/testing"
"antrea.io/antrea/pkg/agent/types"
crdv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1"
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
)

const (
Expand Down Expand Up @@ -205,7 +209,7 @@ func TestPodControllerRun(t *testing.T) {
client,
netdefclient,
informerFactory.Core().V1().Pods().Informer(),
nil, nil)
nil, nil, nil)
podController.interfaceConfigurator = interfaceConfigurator
podController.ipamAllocator = mockIPAM
cniCache := &podController.cniCache
Expand Down Expand Up @@ -936,3 +940,143 @@ func testPodControllerStart(ctrl *gomock.Controller) (
informerFactory.WaitForCacheSync(stopCh)
return podController, mockIPAM, interfaceConfigurator
}

func convertExternalIDMap(in map[string]interface{}) map[string]string {
out := make(map[string]string, len(in))
for k, v := range in {
out[k] = v.(string)
}
return out
}

func createTestInterfaces() (map[string]string, []ovsconfig.OVSPortData, []*interfacestore.InterfaceConfig) {
uuid1 := uuid.New().String()
uuid2 := uuid.New().String()
uuid3 := uuid.New().String()

p1MAC, p1IP := "11:22:33:44:55:66", "192.168.1.10"
p2MAC, p2IP := "11:22:33:44:55:77", "192.168.1.11"

p1NetMAC, _ := net.ParseMAC(p1MAC)
p1NetIP := net.ParseIP(p1IP)
p2NetMAC, _ := net.ParseMAC(p2MAC)
p2NetIP := net.ParseIP(p2IP)

ovsPort1 := ovsconfig.OVSPortData{
UUID: uuid1, Name: "p1", OFPort: 11,
ExternalIDs: convertExternalIDMap(cniserver.BuildOVSPortExternalIDs(
interfacestore.NewContainerInterface("p1", uuid1, "pod1", "ns1", "eth0", p1NetMAC, []net.IP{p1NetIP}, 100)))}

ovsPort2 := ovsconfig.OVSPortData{
UUID: uuid2, Name: "p2", OFPort: 12,
ExternalIDs: convertExternalIDMap(cniserver.BuildOVSPortExternalIDs(
interfacestore.NewContainerInterface("p2", uuid2, "pod2", "ns2", "eth0", p2NetMAC, []net.IP{p2NetIP}, 100)))}

ovsPort3 := ovsconfig.OVSPortData{
UUID: uuid3, Name: "p3", OFPort: -1,
ExternalIDs: convertExternalIDMap(cniserver.BuildOVSPortExternalIDs(
interfacestore.NewContainerInterface("p3", uuid3, "pod3", "ns3", "eth0", p2NetMAC, []net.IP{p2NetIP}, 100)))}

ovsPort4 := ovsconfig.OVSPortData{
UUID: uuid3,
Name: "unknownIface",
OFPort: 20,
ExternalIDs: map[string]string{
"unknownKey": "unknownValue"}}

// Interface configurations
iface1 := cniserver.ParseOVSPortInterfaceConfig(&ovsPort1, &interfacestore.OVSPortConfig{PortUUID: ovsPort1.UUID, OFPort: ovsPort1.OFPort})
iface2 := cniserver.ParseOVSPortInterfaceConfig(&ovsPort2, &interfacestore.OVSPortConfig{PortUUID: ovsPort2.UUID, OFPort: ovsPort2.OFPort})
iface3 := cniserver.ParseOVSPortInterfaceConfig(&ovsPort3, &interfacestore.OVSPortConfig{PortUUID: ovsPort3.UUID, OFPort: ovsPort3.OFPort})

return map[string]string{"uuid1": uuid1, "uuid2": uuid2, "uuid3": uuid3}, []ovsconfig.OVSPortData{ovsPort1, ovsPort2, ovsPort3, ovsPort4}, []*interfacestore.InterfaceConfig{iface1, iface2, iface3}
}

func setupMockController(t *testing.T) (*gomock.Controller, *ovsconfigtest.MockOVSBridgeClient, *podwatchtesting.MockInterfaceConfigurator, *podwatchtesting.MockIPAMAllocator, *PodController) {
ctrl := gomock.NewController(t)
mockOVSBridgeClient := ovsconfigtest.NewMockOVSBridgeClient(ctrl)
interfaceConfigurator := podwatchtesting.NewMockInterfaceConfigurator(ctrl)
mockIPAM := podwatchtesting.NewMockIPAMAllocator(ctrl)

store := interfacestore.NewInterfaceStore()
pc := &PodController{
ovsBridgeClient: mockOVSBridgeClient,
interfaceStore: store,
cniCache: sync.Map{},
interfaceConfigurator: interfaceConfigurator,
ipamAllocator: mockIPAM,
}
return ctrl, mockOVSBridgeClient, interfaceConfigurator, mockIPAM, pc

}

func TestInitializeSecondaryInterfaceStore(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()

// Test Case 1: OVSBridgeClient is nil
store := interfacestore.NewInterfaceStore()
pc := &PodController{
ovsBridgeClient: nil,
interfaceStore: store,
}
err := pc.initializeSecondaryInterfaceStore()
assert.NoError(t, err, "No error when OVSBridgeClient is nil")

// Test Case 2: OVSBridgeClient returns an error
ctrl, mockOVSBridgeClient, _, _, pc := setupMockController(t)
defer ctrl.Finish()

mockOVSBridgeClient.EXPECT().GetPortList().Return(nil, ovsconfig.NewTransactionError(fmt.Errorf("Failed to list OVS ports"), true))
err = pc.initializeSecondaryInterfaceStore()
assert.Error(t, err, "Failed to list OVS ports")

// Test Case 3: OVSBridgeClient returns valid ports
uuids, ovsPorts, _ := createTestInterfaces()
mockOVSBridgeClient.EXPECT().GetPortList().Return(ovsPorts, nil)

err = pc.initializeSecondaryInterfaceStore()
assert.NoError(t, err, "OVS ports list successfully")

// Validate stored interfaces
assert.Equal(t, 3, pc.interfaceStore.Len(), "Only valid interfaces should be stored")
_, found1 := pc.interfaceStore.GetContainerInterface(uuids["uuid1"])
assert.True(t, found1, "Interface 1 should be stored")
_, found2 := pc.interfaceStore.GetContainerInterface(uuids["uuid2"])
assert.True(t, found2, "Interface 2 should be stored")
_, found3 := pc.interfaceStore.GetContainerInterface(uuids["uuid4"])
assert.False(t, found3, "Unknown interface type should not be stored")
}

func TestReconcileSecondaryInterfaces(t *testing.T) {
_, _, interfaceConfigurator, mockIPAM, pc := setupMockController(t)
primaryStore := interfacestore.NewInterfaceStore()

_, _, ifaces := createTestInterfaces()

// Add interfaces to primary store
primaryStore.AddInterface(ifaces[0])
primaryStore.AddInterface(ifaces[1])

// Add interfaces to controller secondaryInterfaceStore
pc.interfaceStore.AddInterface(ifaces[0])
pc.interfaceStore.AddInterface(ifaces[1])
pc.interfaceStore.AddInterface(ifaces[2])

interfaceConfigurator.EXPECT().DeleteVLANSecondaryInterface(gomock.Any()).Return(nil).Times(1)
mockIPAM.EXPECT().SecondaryNetworkRelease(gomock.Any()).Return(nil).Times(1)

err := pc.reconcileSecondaryInterfaces(primaryStore)
assert.NoError(t, err)
pc.interfaceStore.DeleteInterface(ifaces[2])

// Check CNI Cache
_, foundPod1 := pc.cniCache.Load("ns1/pod1")
_, foundPod2 := pc.cniCache.Load("ns2/pod2")
assert.True(t, foundPod1, "CNI Cache should contain ns1/pod1")
assert.True(t, foundPod2, "CNI Cache should contain ns2/pod2")

// Ensure stale interfaces are removed
_, foundPod3 := pc.cniCache.Load("ns3/pod3")
assert.False(t, foundPod3, "Stale interface should have been removed")
}
Loading
Loading