From d47f26c27d523fdfc45b30b721f9057e00e62896 Mon Sep 17 00:00:00 2001 From: Claes Mogren Date: Fri, 7 Aug 2020 20:36:11 -0700 Subject: [PATCH] Add vlan support to ipamd --- cmd/routed-eni-cni-plugin/cni.go | 8 +- cmd/routed-eni-cni-plugin/driver/driver.go | 1 - config/v1.6/aws-k8s-cni.yaml | 6 +- pkg/awsutils/awsutils.go | 19 ++- pkg/awsutils/awsutils_test.go | 2 +- pkg/awsutils/mocks/awsutils_mocks.go | 7 +- pkg/eniconfig/eniconfig.go | 7 +- pkg/eniconfig/eniconfig_test.go | 3 +- pkg/ipamd/datastore/data_store.go | 23 ++- pkg/ipamd/datastore/data_store_test.go | 53 +++--- pkg/ipamd/ipamd.go | 178 +++++++++++++++++++-- pkg/ipamd/ipamd_test.go | 38 ++++- pkg/ipamd/rpc_handler.go | 117 +++++++++++++- pkg/k8sapi/discovery.go | 46 ++++++ pkg/k8sapi/mocks/k8sapi_mocks.go | 94 +++++++++++ pkg/networkutils/mocks/network_mocks.go | 24 ++- pkg/networkutils/network.go | 69 ++++++-- pkg/networkutils/network_test.go | 46 ++++-- 18 files changed, 649 insertions(+), 92 deletions(-) create mode 100644 pkg/k8sapi/mocks/k8sapi_mocks.go diff --git a/cmd/routed-eni-cni-plugin/cni.go b/cmd/routed-eni-cni-plugin/cni.go index b6a4dcb35c..165804faa6 100644 --- a/cmd/routed-eni-cni-plugin/cni.go +++ b/cmd/routed-eni-cni-plugin/cni.go @@ -190,7 +190,6 @@ func add(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap // build hostVethName // Note: the maximum length for linux interface name is 15 hostVethName := generateHostVethName(conf.VethPrefix, conf.Name, args.ContainerID, args.IfName) - err = driverClient.SetupNS(hostVethName, args.IfName, args.Netns, addr, int(r.DeviceNumber), r.VPCcidrs, r.UseExternalSNAT, mtu, log) } @@ -308,6 +307,13 @@ func del(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap log.Infof("Received del network response for pod %s namespace %s sandbox %s: %+v", string(k8sArgs.K8S_POD_NAME), string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID), r) + log.Infof("Received del network response for pod %s namespace %s sandbox %s: %+v", string(k8sArgs.K8S_POD_NAME), + string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID), r) + + if r.IPv4Addr == "" { + log.Info("Pod not found, or IPv4Addr is empty. Skipping TeardownNS.") + return nil + } deletedPodIP := net.ParseIP(r.IPv4Addr) if deletedPodIP != nil { addr := &net.IPNet{ diff --git a/cmd/routed-eni-cni-plugin/driver/driver.go b/cmd/routed-eni-cni-plugin/driver/driver.go index 01e00a0e22..bb0497804e 100644 --- a/cmd/routed-eni-cni-plugin/driver/driver.go +++ b/cmd/routed-eni-cni-plugin/driver/driver.go @@ -440,7 +440,6 @@ func tearDownNS(addr *net.IPNet, table int, netLink netlinkwrapper.NetLink, log toContainerRule.Dst = addr toContainerRule.Priority = toContainerRulePriority err := netLink.RuleDel(toContainerRule) - if err != nil { log.Errorf("Failed to delete toContainer rule for %s err %v", addr.String(), err) } else { diff --git a/config/v1.6/aws-k8s-cni.yaml b/config/v1.6/aws-k8s-cni.yaml index 52558e5be7..41e9d52ce3 100644 --- a/config/v1.6/aws-k8s-cni.yaml +++ b/config/v1.6/aws-k8s-cni.yaml @@ -13,9 +13,13 @@ rules: - apiGroups: [""] resources: - pods - - nodes - namespaces verbs: ["list", "watch", "get"] + - apiGroups: [""] + resources: + - nodes + # Needs "update" to be able to label the node it is running on + verbs: ["list", "watch", "get", "update"] - apiGroups: ["extensions"] resources: - daemonsets diff --git a/pkg/awsutils/awsutils.go b/pkg/awsutils/awsutils.go index 14b6f88ee6..b51e7a68dc 100644 --- a/pkg/awsutils/awsutils.go +++ b/pkg/awsutils/awsutils.go @@ -126,7 +126,7 @@ type APIs interface { GetIPv4sFromEC2(eniID string) (addrList []*ec2.NetworkInterfacePrivateIpAddress, err error) // DescribeAllENIs calls EC2 and returns the ENIMetadata and a tag map for each ENI - DescribeAllENIs() ([]ENIMetadata, map[string]TagMap, error) + DescribeAllENIs() (eniMetadata []ENIMetadata, tagMap map[string]TagMap, trunkENI string, err error) // AllocIPAddress allocates an IP address for an ENI AllocIPAddress(eniID string) error @@ -729,6 +729,7 @@ func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string } else { log.Info("Using same config as the primary interface for the new ENI") } + var sgs []string for i := range input.Groups { sgs = append(sgs, *input.Groups[i]) @@ -998,11 +999,11 @@ func (cache *EC2InstanceMetadataCache) GetIPv4sFromEC2(eniID string) (addrList [ } // DescribeAllENIs calls EC2 to refrech the ENIMetadata and tags for all attached ENIs -func (cache *EC2InstanceMetadataCache) DescribeAllENIs() ([]ENIMetadata, map[string]TagMap, error) { +func (cache *EC2InstanceMetadataCache) DescribeAllENIs() (eniMetadata []ENIMetadata, tagMap map[string]TagMap, trunkENI string, err error) { // Fetch all local ENI info from metadata allENIs, err := cache.GetAttachedENIs() if err != nil { - return nil, nil, errors.Wrap(err, "DescribeAllENIs: failed to get local ENI metadata") + return nil, nil, "", errors.Wrap(err, "DescribeAllENIs: failed to get local ENI metadata") } eniMap := make(map[string]ENIMetadata, len(allENIs)) @@ -1047,7 +1048,7 @@ func (cache *EC2InstanceMetadataCache) DescribeAllENIs() ([]ENIMetadata, map[str } if err != nil { - return nil, nil, err + return nil, nil, "", err } // Collect the verified ENIs @@ -1057,13 +1058,19 @@ func (cache *EC2InstanceMetadataCache) DescribeAllENIs() ([]ENIMetadata, map[str } // Collect ENI response into ENI metadata and tags. - tagMap := make(map[string]TagMap, len(ec2Response.NetworkInterfaces)) + tagMap = make(map[string]TagMap, len(ec2Response.NetworkInterfaces)) for _, ec2res := range ec2Response.NetworkInterfaces { if ec2res.Attachment != nil && aws.Int64Value(ec2res.Attachment.DeviceIndex) == 0 && !aws.BoolValue(ec2res.Attachment.DeleteOnTermination) { log.Warn("Primary ENI will not get deleted when node terminates because 'delete_on_termination' is set to false") } eniID := aws.StringValue(ec2res.NetworkInterfaceId) eniMetadata := eniMap[eniID] + interfaceType := aws.StringValue(ec2res.InterfaceType) + log.Infof("XXXXXXX Got interface type: %q", interfaceType) // TODO Debug only... + // TODO: This assumes we only have one trunk attached to the node.. + if interfaceType == "trunk" || interfaceType == "vlan" { + trunkENI = eniID + } // Check IPv4 addresses logOutOfSyncState(eniID, eniMetadata.IPv4Addresses, ec2res.PrivateIpAddresses) tags := make(map[string]string, len(ec2res.TagSet)) @@ -1078,7 +1085,7 @@ func (cache *EC2InstanceMetadataCache) DescribeAllENIs() ([]ENIMetadata, map[str tagMap[eniMetadata.ENIID] = tags } } - return verifiedENIs, tagMap, nil + return verifiedENIs, tagMap, trunkENI, nil } var eniErrorMessageRegex = regexp.MustCompile("'([a-zA-Z0-9-]+)'") diff --git a/pkg/awsutils/awsutils_test.go b/pkg/awsutils/awsutils_test.go index d40cabc414..1a87453a02 100644 --- a/pkg/awsutils/awsutils_test.go +++ b/pkg/awsutils/awsutils_test.go @@ -393,7 +393,7 @@ func TestDescribeAllENIs(t *testing.T) { for _, tc := range testCases { mockEC2.EXPECT().DescribeNetworkInterfacesWithContext(gomock.Any(), gomock.Any(), gomock.Any()).Times(tc.n).Return(result, tc.awsErr) ins := &EC2InstanceMetadataCache{ec2Metadata: mockMetadata, ec2SVC: mockEC2} - _, tags, err := ins.DescribeAllENIs() + _, tags, _, err := ins.DescribeAllENIs() assert.Equal(t, tc.expErr, err, tc.name) assert.Equal(t, tc.exptags, tags, tc.name) } diff --git a/pkg/awsutils/mocks/awsutils_mocks.go b/pkg/awsutils/mocks/awsutils_mocks.go index eb5780d943..6b875e0699 100644 --- a/pkg/awsutils/mocks/awsutils_mocks.go +++ b/pkg/awsutils/mocks/awsutils_mocks.go @@ -107,13 +107,14 @@ func (mr *MockAPIsMockRecorder) DeallocIPAddresses(arg0, arg1 interface{}) *gomo } // DescribeAllENIs mocks base method -func (m *MockAPIs) DescribeAllENIs() ([]awsutils.ENIMetadata, map[string]awsutils.TagMap, error) { +func (m *MockAPIs) DescribeAllENIs() ([]awsutils.ENIMetadata, map[string]awsutils.TagMap, string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DescribeAllENIs") ret0, _ := ret[0].([]awsutils.ENIMetadata) ret1, _ := ret[1].(map[string]awsutils.TagMap) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + ret2, _ := ret[2].(string) + ret3, _ := ret[3].(error) + return ret0, ret1, ret2, ret3 } // DescribeAllENIs indicates an expected call of DescribeAllENIs diff --git a/pkg/eniconfig/eniconfig.go b/pkg/eniconfig/eniconfig.go index eb0757b54d..a3c0e80baa 100644 --- a/pkg/eniconfig/eniconfig.go +++ b/pkg/eniconfig/eniconfig.go @@ -126,12 +126,17 @@ func (h *Handler) Handle(ctx context.Context, event sdk.Event) error { val = eniConfigDefault } } - + // If value changes if h.controller.myENI != val { h.controller.eniLock.Lock() defer h.controller.eniLock.Unlock() h.controller.myENI = val log.Debugf("Setting myENI to: %s", val) + if val != eniConfigDefault { + labels := o.GetLabels() + labels["vpc.amazonaws.com/eniConfig"] = val + o.SetLabels(labels) + } } } } diff --git a/pkg/eniconfig/eniconfig_test.go b/pkg/eniconfig/eniconfig_test.go index 6bc91ff0af..6ed50ae8ea 100644 --- a/pkg/eniconfig/eniconfig_test.go +++ b/pkg/eniconfig/eniconfig_test.go @@ -46,7 +46,8 @@ func updateNodeAnnotation(hdlr sdk.Handler, nodeName string, configName string, node := corev1.Node{ TypeMeta: metav1.TypeMeta{APIVersion: corev1.SchemeGroupVersion.String()}, ObjectMeta: metav1.ObjectMeta{ - Name: nodeName, + Name: nodeName, + Labels: make(map[string]string), }, } accessor, err := meta.Accessor(&node) diff --git a/pkg/ipamd/datastore/data_store.go b/pkg/ipamd/datastore/data_store.go index 0d26622c1c..9aa391befe 100644 --- a/pkg/ipamd/datastore/data_store.go +++ b/pkg/ipamd/datastore/data_store.go @@ -115,13 +115,15 @@ func (k IPAMKey) String() string { return fmt.Sprintf("%s/%s/%s", k.NetworkName, k.ContainerID, k.IfName) } -// ENI represents a single ENI. +// ENI represents a single ENI. Exported fields will be marshaled for introspection. type ENI struct { // AWS ENI ID ID string createTime time.Time // IsPrimary indicates whether ENI is a primary ENI IsPrimary bool + // IsTrunk indicates whether this ENI is used to provide pods with dedicated ENIs + IsTrunk bool // DeviceNumber is the device number of ENI (0 means the primary ENI) DeviceNumber int // IPv4Addresses shows whether each address is assigned, the key is IP address, which must @@ -355,7 +357,7 @@ func (ds *DataStore) writeBackingStoreUnsafe() error { } // AddENI add ENI to data store -func (ds *DataStore) AddENI(eniID string, deviceNumber int, isPrimary bool) error { +func (ds *DataStore) AddENI(eniID string, deviceNumber int, isPrimary, isTrunk bool) error { ds.lock.Lock() defer ds.lock.Unlock() @@ -368,6 +370,7 @@ func (ds *DataStore) AddENI(eniID string, deviceNumber int, isPrimary bool) erro ds.eniPool[eniID] = &ENI{ createTime: time.Now(), IsPrimary: isPrimary, + IsTrunk: isTrunk, ID: eniID, DeviceNumber: deviceNumber, IPv4Addresses: make(map[string]*AddressInfo)} @@ -505,6 +508,17 @@ func (ds *DataStore) GetStats() (int, int) { return ds.total, ds.assigned } +func (ds *DataStore) GetTrunkENI() string { + ds.lock.Lock() + defer ds.lock.Unlock() + for _, eni := range ds.eniPool { + if eni.IsTrunk { + return eni.ID + } + } + return "" +} + // IsRequiredForWarmIPTarget determines if this ENI has warm IPs that are required to fulfill whatever WARM_IP_TARGET is // set to. func (ds *DataStore) isRequiredForWarmIPTarget(warmIPTarget int, eni *ENI) bool { @@ -561,6 +575,11 @@ func (ds *DataStore) getDeletableENI(warmIPTarget int, minimumIPTarget int) *ENI continue } + if eni.IsTrunk { + ds.log.Debugf("ENI %s cannot be deleted because it is a trunk ENI", eni.ID) + continue + } + ds.log.Debugf("getDeletableENI: found a deletable ENI %s", eni.ID) return eni } diff --git a/pkg/ipamd/datastore/data_store_test.go b/pkg/ipamd/datastore/data_store_test.go index 9ef6ee0847..6a37edf348 100644 --- a/pkg/ipamd/datastore/data_store_test.go +++ b/pkg/ipamd/datastore/data_store_test.go @@ -34,13 +34,13 @@ var log = logger.New(&logConfig) func TestAddENI(t *testing.T) { ds := NewDataStore(log, NullCheckpoint{}) - err := ds.AddENI("eni-1", 1, true) + err := ds.AddENI("eni-1", 1, true, false) assert.NoError(t, err) - err = ds.AddENI("eni-1", 1, true) + err = ds.AddENI("eni-1", 1, true, false) assert.Error(t, err) - err = ds.AddENI("eni-2", 2, false) + err = ds.AddENI("eni-2", 2, false, false) assert.NoError(t, err) assert.Equal(t, len(ds.eniPool), 2) @@ -52,13 +52,13 @@ func TestAddENI(t *testing.T) { func TestDeleteENI(t *testing.T) { ds := NewDataStore(log, NullCheckpoint{}) - err := ds.AddENI("eni-1", 1, true) + err := ds.AddENI("eni-1", 1, true, false) assert.NoError(t, err) - err = ds.AddENI("eni-2", 2, false) + err = ds.AddENI("eni-2", 2, false, false) assert.NoError(t, err) - err = ds.AddENI("eni-3", 3, false) + err = ds.AddENI("eni-3", 3, false, false) assert.NoError(t, err) eniInfos := ds.GetENIInfos() @@ -95,10 +95,10 @@ func TestDeleteENI(t *testing.T) { func TestAddENIIPv4Address(t *testing.T) { ds := NewDataStore(log, NullCheckpoint{}) - err := ds.AddENI("eni-1", 1, true) + err := ds.AddENI("eni-1", 1, true, false) assert.NoError(t, err) - err = ds.AddENI("eni-2", 2, false) + err = ds.AddENI("eni-2", 2, false, false) assert.NoError(t, err) err = ds.AddIPv4AddressToStore("eni-1", "1.1.1.1") @@ -133,10 +133,10 @@ func TestAddENIIPv4Address(t *testing.T) { func TestGetENIIPs(t *testing.T) { ds := NewDataStore(log, NullCheckpoint{}) - err := ds.AddENI("eni-1", 1, true) + err := ds.AddENI("eni-1", 1, true, false) assert.NoError(t, err) - err = ds.AddENI("eni-2", 2, false) + err = ds.AddENI("eni-2", 2, false, false) assert.NoError(t, err) err = ds.AddIPv4AddressToStore("eni-1", "1.1.1.1") @@ -165,7 +165,7 @@ func TestGetENIIPs(t *testing.T) { func TestDelENIIPv4Address(t *testing.T) { ds := NewDataStore(log, NullCheckpoint{}) - err := ds.AddENI("eni-1", 1, true) + err := ds.AddENI("eni-1", 1, true, false) assert.NoError(t, err) err = ds.AddIPv4AddressToStore("eni-1", "1.1.1.1") @@ -218,11 +218,14 @@ func TestPodIPv4Address(t *testing.T) { checkpoint := NewTestCheckpoint(struct{}{}) ds := NewDataStore(log, checkpoint) - ds.AddENI("eni-1", 1, true) + err := ds.AddENI("eni-1", 1, true, false) + assert.NoError(t, err) - ds.AddENI("eni-2", 2, false) + err = ds.AddENI("eni-2", 2, false, false) + assert.NoError(t, err) - ds.AddIPv4AddressToStore("eni-1", "1.1.1.1") + err = ds.AddIPv4AddressToStore("eni-1", "1.1.1.1") + assert.NoError(t, err) key1 := IPAMKey{"net0", "sandbox-1", "eth0"} ip, _, err := ds.AssignPodIPv4Address(key1) @@ -242,7 +245,8 @@ func TestPodIPv4Address(t *testing.T) { podsInfos := ds.AllocatedIPs() assert.Equal(t, len(podsInfos), 1) - ds.AddIPv4AddressToStore("eni-2", "1.1.2.2") + err = ds.AddIPv4AddressToStore("eni-2", "1.1.2.2") + assert.NoError(t, err) // duplicate add ip, _, err = ds.AssignPodIPv4Address(key1) // same id @@ -280,7 +284,8 @@ func TestPodIPv4Address(t *testing.T) { podsInfos = ds.AllocatedIPs() assert.Equal(t, len(podsInfos), 2) - ds.AddIPv4AddressToStore("eni-1", "1.1.1.2") + err = ds.AddIPv4AddressToStore("eni-1", "1.1.1.2") + assert.NoError(t, err) key3 := IPAMKey{"net0", "sandbox-3", "eth0"} ip, _, err = ds.AssignPodIPv4Address(key3) @@ -328,23 +333,23 @@ func TestPodIPv4Address(t *testing.T) { func TestWarmENIInteractions(t *testing.T) { ds := NewDataStore(log, NullCheckpoint{}) - ds.AddENI("eni-1", 1, true) - ds.AddENI("eni-2", 2, false) - ds.AddENI("eni-3", 3, false) + _ = ds.AddENI("eni-1", 1, true, false) + _ = ds.AddENI("eni-2", 2, false, false) + _ = ds.AddENI("eni-3", 3, false, false) - ds.AddIPv4AddressToStore("eni-1", "1.1.1.1") + _ = ds.AddIPv4AddressToStore("eni-1", "1.1.1.1") key1 := IPAMKey{"net0", "sandbox-1", "eth0"} _, _, err := ds.AssignPodIPv4Address(key1) assert.NoError(t, err) - ds.AddIPv4AddressToStore("eni-1", "1.1.1.2") + _ = ds.AddIPv4AddressToStore("eni-1", "1.1.1.2") key2 := IPAMKey{"net0", "sandbox-2", "eth0"} _, _, err = ds.AssignPodIPv4Address(key2) assert.NoError(t, err) - ds.AddIPv4AddressToStore("eni-2", "1.1.2.1") - ds.AddIPv4AddressToStore("eni-2", "1.1.2.2") - ds.AddIPv4AddressToStore("eni-3", "1.1.3.1") + _ = ds.AddIPv4AddressToStore("eni-2", "1.1.2.1") + _ = ds.AddIPv4AddressToStore("eni-2", "1.1.2.2") + _ = ds.AddIPv4AddressToStore("eni-3", "1.1.3.1") noWarmIPTarget := 0 diff --git a/pkg/ipamd/ipamd.go b/pkg/ipamd/ipamd.go index 2d1bc8b4be..05643e6bee 100644 --- a/pkg/ipamd/ipamd.go +++ b/pkg/ipamd/ipamd.go @@ -24,6 +24,9 @@ import ( "sync/atomic" "time" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/aws/amazon-vpc-cni-k8s/pkg/awsutils" "github.com/aws/amazon-vpc-cni-k8s/pkg/eniconfig" "github.com/aws/amazon-vpc-cni-k8s/pkg/ipamd/datastore" @@ -114,6 +117,12 @@ const ( // Specify where ipam should persist its current IP<->container allocations. envBackingStorePath = "AWS_VPC_K8S_CNI_BACKING_STORE" defaultBackingStorePath = "/var/run/aws-node/ipam.json" + + // envEnablePodENI is used to attach a Trunk ENI to every node. Required in order to give Branch ENIs to pods. + envEnablePodENI = "ENABLE_POD_ENI" + + // vpcENIConfigLabel is used by the VPC resource controller to pick the right ENI config. + vpcENIConfigLabel = "vpc.amazonaws.com/eniConfig" ) var log = logger.Get() @@ -165,6 +174,13 @@ var ( }, []string{"reason"}, ) + podENIErr = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "awscni_pod_eni_error_count", + Help: "The number of errors encountered for pod ENIs", + }, + []string{"fn"}, + ) prometheusRegistered = false ) @@ -191,6 +207,8 @@ type IPAMContext struct { reconcileCooldownCache ReconcileCooldownCache terminating int32 // Flag to warn that the pod is about to shut down. disableENIProvisioning bool + enablePodENI bool + myNodeName string } // UnmanagedENISet keeps a set of ENI IDs for ENIs tagged with "node.k8s.amazonaws.com/no_manage" @@ -282,6 +300,7 @@ func prometheusRegister() { prometheus.MustRegister(reconcileCnt) prometheus.MustRegister(addIPCnt) prometheus.MustRegister(delIPCnt) + prometheus.MustRegister(podENIErr) prometheusRegistered = true } } @@ -309,6 +328,8 @@ func New(k8sapiClient kubernetes.Interface, eniConfig *eniconfig.ENIConfigContro c.minimumIPTarget = getMinimumIPTarget() c.useCustomNetworking = UseCustomNetworkCfg() c.disableENIProvisioning = disablingENIProvisioning() + c.enablePodENI = enablePodENI() + c.myNodeName = os.Getenv("MY_NODE_NAME") checkpointer := datastore.NewJSONFile(dsBackingStorePath()) c.dataStore = datastore.NewDataStore(log, checkpointer) @@ -340,12 +361,12 @@ func (c *IPAMContext) nodeInit() error { vpcCIDRs := c.awsClient.GetVPCIPv4CIDRs() primaryIP := net.ParseIP(c.awsClient.GetLocalIPv4()) - err = c.networkClient.SetupHostNetwork(vpcCIDRs, c.awsClient.GetPrimaryENImac(), &primaryIP) + err = c.networkClient.SetupHostNetwork(vpcCIDRs, c.awsClient.GetPrimaryENImac(), &primaryIP, c.enablePodENI) if err != nil { return errors.Wrap(err, "ipamd init: failed to set up host network") } - eniMetadata, tagMap, err := c.awsClient.DescribeAllENIs() + eniMetadata, tagMap, trunkENI, err := c.awsClient.DescribeAllENIs() if err != nil { return errors.New("ipamd init: failed to retrieve attached ENIs info") } @@ -359,7 +380,7 @@ func (c *IPAMContext) nodeInit() error { retry := 0 for { retry++ - if err = c.setupENI(eni.ENIID, eni); err == nil { + if err = c.setupENI(eni.ENIID, eni, trunkENI); err == nil { log.Infof("ENI %s set up.", eni.ENIID) break } @@ -389,6 +410,39 @@ func (c *IPAMContext) nodeInit() error { return err } + if c.useCustomNetworking && c.eniConfig.Getter().MyENI != "default" { + // Signal to VPC Resource Controller that the node is using custom networking + err := c.SetNodeLabel(vpcENIConfigLabel, c.eniConfig.Getter().MyENI) + if err != nil { + log.Errorf("Failed to set eniConfig node label", err) + podENIErrInc("nodeInit") + return err + } + } else { + // Remove the custom networking label + err := c.SetNodeLabel(vpcENIConfigLabel, "") + if err != nil { + log.Errorf("Failed to delete eniConfig node label", err) + podENIErrInc("nodeInit") + return err + } + } + + // If we started on a node with a trunk ENI already attached, add the node label. + if trunkENI != "" { + // Signal to VPC Resource Controller that the node has a trunk already + err := c.SetNodeLabel("vpc.amazonaws.com/has-trunk-attached", "true") + if err != nil { + log.Errorf("Failed to set node label", err) + podENIErrInc("nodeInit") + // If this fails, we probably can't talk to the API server. Let the pod restart + return err + } + } else { + // Check if we want to ask for one + c.askForTrunkENIIfNeeded() + } + // For a new node, attach IPs increasedPool, err := c.tryAssignIPs() if err == nil && increasedPool { @@ -451,6 +505,7 @@ func (c *IPAMContext) StartNodeIPPoolManager() { } func (c *IPAMContext) updateIPPoolIfRequired() { + c.askForTrunkENIIfNeeded() if c.nodeIPPoolTooLow() { c.increaseIPPool() } else if c.nodeIPPoolTooHigh() { @@ -589,13 +644,19 @@ func (c *IPAMContext) increaseIPPool() { if increasedPool { c.updateLastNodeIPPoolAction() } else { + // Check if we need to make room for the VPC Resource Controller to attach a trunk ENI + reserveSlotForTrunkENI := 0 + if c.enablePodENI && c.dataStore.GetTrunkENI() == "" { + reserveSlotForTrunkENI = 1 + } // If we did not add an IP, try to add an ENI instead. - if c.dataStore.GetENIs() < (c.maxENI - c.unmanagedENI) { + if c.dataStore.GetENIs() < (c.maxENI - c.unmanagedENI - reserveSlotForTrunkENI) { if err = c.tryAllocateENI(); err == nil { c.updateLastNodeIPPoolAction() } } else { - log.Debugf("Skipping ENI allocation as the instance's max ENI limit of %d is already reached (accounting for %d unmanaged ENIs)", c.maxENI, c.unmanagedENI) + log.Debugf("Skipping ENI allocation as the max ENI limit of %d is already reached (accounting for %d unmanaged ENIs and %d trunk ENIs)", + c.maxENI, c.unmanagedENI, reserveSlotForTrunkENI) } } } @@ -653,14 +714,13 @@ func (c *IPAMContext) tryAllocateENI() error { log.Errorf("Failed to increase pool size: Unable to discover attached ENI from metadata service %v", err) return err } - - err = c.setupENI(eni, eniMetadata) + err = c.setupENI(eni, eniMetadata, c.dataStore.GetTrunkENI()) if err != nil { ipamdErrInc("increaseIPPoolsetupENIFailed") log.Errorf("Failed to increase pool size: %v", err) return err } - return nil + return err } // For an ENI, try to fill in missing IPs on an existing ENI @@ -702,9 +762,9 @@ func (c *IPAMContext) tryAssignIPs() (increasedPool bool, err error) { // 1) add ENI to datastore // 2) set up linux ENI related networking stack. // 3) add all ENI's secondary IP addresses to datastore -func (c *IPAMContext) setupENI(eni string, eniMetadata awsutils.ENIMetadata) error { +func (c *IPAMContext) setupENI(eni string, eniMetadata awsutils.ENIMetadata, trunkENI string) error { // Add the ENI to the datastore - err := c.dataStore.AddENI(eni, eniMetadata.DeviceNumber, eni == c.awsClient.GetPrimaryENI()) + err := c.dataStore.AddENI(eni, eniMetadata.DeviceNumber, eni == c.awsClient.GetPrimaryENI(), eni == trunkENI) if err != nil && err.Error() != datastore.DuplicatedENIError { return errors.Wrapf(err, "failed to add ENI %s to data store", eni) } @@ -812,6 +872,18 @@ func logPoolStats(total, used, maxAddrsPerENI int) { log.Debugf("IP pool stats: total = %d, used = %d, c.maxIPsPerENI = %d", total, used, maxAddrsPerENI) } +func (c *IPAMContext) askForTrunkENIIfNeeded() { + if c.enablePodENI && c.dataStore.GetTrunkENI() == "" { + // We need to signal that VPC Resource Controller needs to attach a trunk ENI + err := c.SetNodeLabel("vpc.amazonaws.com/has-trunk-attached", "false") + if err != nil { + podENIErrInc("askForTrunkENIIfNeeded") + log.Errorf("Failed to set node label", err) + } + return + } +} + // nodeIPPoolTooLow returns true if IP pool is below low threshold func (c *IPAMContext) nodeIPPoolTooLow() bool { short, _, warmIPTargetDefined := c.ipTargetState() @@ -864,6 +936,10 @@ func ipamdErrInc(fn string) { ipamdErr.With(prometheus.Labels{"fn": fn}).Inc() } +func podENIErrInc(fn string) { + podENIErr.With(prometheus.Labels{"fn": fn}).Inc() +} + // nodeIPPoolReconcile reconcile ENI and IP info from metadata service and IP addresses in datastore func (c *IPAMContext) nodeIPPoolReconcile(interval time.Duration) { curTime := time.Now() @@ -884,8 +960,9 @@ func (c *IPAMContext) nodeIPPoolReconcile(interval time.Duration) { } attachedENIs := c.filterUnmanagedENIs(allENIs) currentENIIPPools := c.dataStore.GetENIInfos().ENIs + trunkENI := c.dataStore.GetTrunkENI() - // Check if a new ENI was added, if so we need to update the tags + // Check if a new ENI was added, if so we need to update the tags. needToUpdateTags := false for _, attachedENI := range attachedENIs { if _, ok := currentENIIPPools[attachedENI.ENIID]; !ok { @@ -895,11 +972,23 @@ func (c *IPAMContext) nodeIPPoolReconcile(interval time.Duration) { } if needToUpdateTags { log.Debugf("A new ENI added but not by ipamd, updating tags") - allENIs, tagMap, err := c.awsClient.DescribeAllENIs() + allENIs, tagMap, trunk, err := c.awsClient.DescribeAllENIs() if err != nil { log.Warnf("Failed to call EC2 to describe ENIs, aborting reconcile: %v", err) return } + + if c.enablePodENI && trunk != "" { + // Label the node that we have a trunk + err = c.SetNodeLabel("vpc.amazonaws.com/has-trunk-attached", "true") + if err != nil { + podENIErrInc("askForTrunkENIIfNeeded") + log.Errorf("Failed to set node label for trunk. Aborting reconcile", err) + return + } + } + // Update trunk ENI + trunkENI = trunk c.setUnmanagedENIs(tagMap) attachedENIs = c.filterUnmanagedENIs(allENIs) } @@ -919,7 +1008,7 @@ func (c *IPAMContext) nodeIPPoolReconcile(interval time.Duration) { // Add new ENI log.Debugf("Reconcile and add a new ENI %s", attachedENI) - err = c.setupENI(attachedENI.ENIID, attachedENI) + err = c.setupENI(attachedENI.ENIID, attachedENI, trunkENI) if err != nil { log.Errorf("IP pool reconcile: Failed to set up ENI %s network: %v", attachedENI.ENIID, err) ipamdErrInc("eniReconcileAdd") @@ -1079,7 +1168,11 @@ func getMinimumIPTarget() int { } func disablingENIProvisioning() bool { - return getEnvBoolWithDefault(envDisableENIProvisioning, noDisableENIProvisioning) + return getEnvBoolWithDefault(envDisableENIProvisioning, false) +} + +func enablePodENI() bool { + return getEnvBoolWithDefault(envEnablePodENI, false) } // filterUnmanagedENIs filters out ENIs marked with the "node.k8s.amazonaws.com/no_manage" tag @@ -1158,3 +1251,60 @@ func min(x, y int) int { } return x } + +func (c *IPAMContext) getTrunkLinkIndex() (int, error) { + trunkENI := c.dataStore.GetTrunkENI() + attachedENIs, err := c.awsClient.GetAttachedENIs() + if err != nil { + return -1, err + } + for _, eni := range attachedENIs { + if eni.ENIID == trunkENI { + retryLinkByMacInterval := 100 * time.Millisecond + link, err := c.networkClient.GetLinkByMac(eni.MAC, retryLinkByMacInterval) + if err != nil { + return -1, err + } + return link.Attrs().Index, nil + + } + } + return -1, errors.New("No trunk!") +} + +func (c *IPAMContext) SetNodeLabel(key, value string) error { + // Find my node + node, err := c.k8sClient.CoreV1().Nodes().Get(c.myNodeName, metav1.GetOptions{}) + if err != nil { + log.Errorf("Failed to get node: %v", err) + return err + } + + if labelValue, ok := node.Labels[key]; ok && labelValue == value { + log.Debugf("Node label %q is already %q", key, labelValue) + return nil + } + // Make deep copy for modification + updateNode := node.DeepCopy() + + // Set node label + if value != "" { + updateNode.Labels[key] = value + } else { + // Empty value, delete the label + log.Debugf("Deleting label %q", key) + delete(updateNode.Labels, key) + } + + // Update node status to advertise the resource. + _, err = c.k8sClient.CoreV1().Nodes().Update(updateNode) + if err != nil { + log.Errorf("Failed to update node %s with label %q: %q, error: %v", c.myNodeName, key, value, err) + } + log.Infof("Updated node %s with label %q: %q", c.myNodeName, key, value) + return nil +} + +func (c *IPAMContext) GetPod(podName, namespace string) (*v1.Pod, error) { + return c.k8sClient.CoreV1().Pods(namespace).Get(podName, metav1.GetOptions{}) +} diff --git a/pkg/ipamd/ipamd_test.go b/pkg/ipamd/ipamd_test.go index 3b54cc6de2..4f0764f37d 100644 --- a/pkg/ipamd/ipamd_test.go +++ b/pkg/ipamd/ipamd_test.go @@ -20,6 +20,9 @@ import ( "reflect" "testing" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/aws/amazon-vpc-cni-k8s/pkg/apis/crd/v1alpha1" "github.com/aws/amazon-vpc-cni-k8s/pkg/awsutils" mock_awsutils "github.com/aws/amazon-vpc-cni-k8s/pkg/awsutils/mocks" @@ -49,6 +52,7 @@ const ( ipaddr11 = "10.10.20.11" ipaddr12 = "10.10.20.12" vpcCIDR = "10.10.0.0/16" + myNodeName = "testNodeName" ) type testMocks struct { @@ -83,6 +87,7 @@ func TestNodeInit(t *testing.T) { mockContext := &IPAMContext{ awsClient: m.awsutils, + k8sClient: m.clientset, maxIPsPerENI: 14, maxENI: 4, warmENITarget: 1, @@ -91,6 +96,7 @@ func TestNodeInit(t *testing.T) { terminating: int32(0), networkClient: m.network, dataStore: datastore.NewDataStore(log, datastore.NewTestCheckpoint(fakeCheckpoint)), + myNodeName: myNodeName, } eni1, eni2 := getDummyENIMetadata() @@ -104,12 +110,12 @@ func TestNodeInit(t *testing.T) { primaryIP := net.ParseIP(ipaddr01) m.awsutils.EXPECT().GetVPCIPv4CIDRs().AnyTimes().Return(cidrs) m.awsutils.EXPECT().GetPrimaryENImac().Return("") - m.network.EXPECT().SetupHostNetwork(cidrs, "", &primaryIP).Return(nil) + m.network.EXPECT().SetupHostNetwork(cidrs, "", &primaryIP, false).Return(nil) m.awsutils.EXPECT().GetPrimaryENI().AnyTimes().Return(primaryENIid) eniMetadataSlice := []awsutils.ENIMetadata{eni1, eni2} - m.awsutils.EXPECT().DescribeAllENIs().Return(eniMetadataSlice, map[string]awsutils.TagMap{}, nil) + m.awsutils.EXPECT().DescribeAllENIs().Return(eniMetadataSlice, map[string]awsutils.TagMap{}, "", nil) m.network.EXPECT().SetupENINetwork(gomock.Any(), secMAC, secDevice, secSubnet) m.awsutils.EXPECT().GetLocalIPv4().Return(ipaddr01) @@ -119,6 +125,15 @@ func TestNodeInit(t *testing.T) { m.network.EXPECT().UseExternalSNAT().Return(false) m.network.EXPECT().UpdateRuleListBySrc(gomock.Any(), gomock.Any(), gomock.Any(), true) + + fakeNode := v1.Node{ + TypeMeta: metav1.TypeMeta{Kind: "Node"}, + ObjectMeta: metav1.ObjectMeta{Name: myNodeName}, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{}, + } + _, _ = m.clientset.CoreV1().Nodes().Create(&fakeNode) + // Add IPs m.awsutils.EXPECT().AllocIPAddresses(gomock.Any(), gomock.Any()) @@ -360,7 +375,7 @@ func TestNodeIPPoolReconcile(t *testing.T) { } m.awsutils.EXPECT().GetAttachedENIs().Return(eniMetadata, nil) m.awsutils.EXPECT().GetPrimaryENI().Times(2).Return(primaryENIid) - m.awsutils.EXPECT().DescribeAllENIs().Return(eniMetadata, map[string]awsutils.TagMap{}, nil) + m.awsutils.EXPECT().DescribeAllENIs().Return(eniMetadata, map[string]awsutils.TagMap{}, "", nil) mockContext.nodeIPPoolReconcile(0) @@ -437,7 +452,7 @@ func TestGetWarmIPTargetState(t *testing.T) { assert.Equal(t, 0, over) // add 2 addresses to datastore - _ = mockContext.dataStore.AddENI("eni-1", 1, true) + _ = mockContext.dataStore.AddENI("eni-1", 1, true, false) _ = mockContext.dataStore.AddIPv4AddressToStore("eni-1", "1.1.1.1") _ = mockContext.dataStore.AddIPv4AddressToStore("eni-1", "1.1.1.2") @@ -508,7 +523,7 @@ func testDatastore() *datastore.DataStore { func datastoreWith3FreeIPs() *datastore.DataStore { datastoreWith3FreeIPs := testDatastore() - _ = datastoreWith3FreeIPs.AddENI(primaryENIid, 1, true) + _ = datastoreWith3FreeIPs.AddENI(primaryENIid, 1, true, false) _ = datastoreWith3FreeIPs.AddIPv4AddressToStore(primaryENIid, ipaddr01) _ = datastoreWith3FreeIPs.AddIPv4AddressToStore(primaryENIid, ipaddr02) _ = datastoreWith3FreeIPs.AddIPv4AddressToStore(primaryENIid, ipaddr03) @@ -585,3 +600,16 @@ func TestDisablingENIProvisioning(t *testing.T) { disabled = disablingENIProvisioning() assert.False(t, disabled) } + +func TestPodENIConfigFlag(t *testing.T) { + m := setup(t) + defer m.ctrl.Finish() + + _ = os.Setenv(envEnablePodENI, "true") + disabled := enablePodENI() + assert.True(t, disabled) + + _ = os.Unsetenv(envEnablePodENI) + disabled = enablePodENI() + assert.False(t, disabled) +} diff --git a/pkg/ipamd/rpc_handler.go b/pkg/ipamd/rpc_handler.go index 7d8a2cee47..7f72eff16a 100644 --- a/pkg/ipamd/rpc_handler.go +++ b/pkg/ipamd/rpc_handler.go @@ -14,9 +14,11 @@ package ipamd import ( + "encoding/json" "net" "os" "os/signal" + "strings" "syscall" "github.com/pkg/errors" @@ -28,7 +30,9 @@ import ( "google.golang.org/grpc/reflection" "github.com/aws/amazon-vpc-cni-k8s/pkg/ipamd/datastore" + "github.com/aws/amazon-vpc-cni-k8s/pkg/networkutils" "github.com/aws/amazon-vpc-cni-k8s/rpc" + k8serror "k8s.io/apimachinery/pkg/api/errors" ) const ( @@ -41,18 +45,88 @@ type server struct { ipamContext *IPAMContext } +// PodENIData is used to parse the list of ENIs in the branch ENI pod annotation +type PodENIData struct { + ENIID string `json:"eniId"` + IfAddress string `json:"ifAddress"` + PrivateIP string `json:"privateIp"` + VlanID int `json:"vlanId"` + SubnetCIDR string `json:"subnetCidr"` +} + // AddNetwork processes CNI add network request and return an IP address for container func (s *server) AddNetwork(ctx context.Context, in *rpc.AddNetworkRequest) (*rpc.AddNetworkReply, error) { log.Infof("Received AddNetwork for NS %s, Sandbox %s, ifname %s", in.Netns, in.ContainerID, in.IfName) + addIPCnt.Inc() - ipamKey := datastore.IPAMKey{ - ContainerID: in.ContainerID, - IfName: in.IfName, - NetworkName: in.NetworkName, + failureResponse := rpc.AddNetworkReply{Success: false} + var deviceNumber, vlanId, trunkENILinkIndex int + var addr, branchENIMAC, podENISubnetGW string + var err error + if s.ipamContext.enablePodENI { + // Check pod spec for Branch ENI + pod, err := s.ipamContext.GetPod(in.K8S_POD_NAME, in.K8S_POD_NAMESPACE) + if err != nil { + log.Warnf("Send AddNetworkReply: Failed to get pod: %v", err) + return &failureResponse, nil + } + limits := pod.Spec.Containers[0].Resources.Limits + for resName := range limits { + if strings.HasPrefix(string(resName), "vpc.amazonaws.com/pod-eni") { + // Check that we have a trunk + trunkENI := s.ipamContext.dataStore.GetTrunkENI() + if trunkENI == "" { + log.Warn("Send AddNetworkReply: No trunk ENI found, cannot add a pod ENI") + return &failureResponse, nil + } else { + trunkENILinkIndex, err = s.ipamContext.getTrunkLinkIndex() + if err != nil { + log.Warn("Send AddNetworkReply: No trunk ENI Link Index found, cannot add a pod ENI") + return &failureResponse, nil + } + } + val, branch := pod.Annotations["vpc.amazonaws.com/pod-eni"] + if branch { + // Parse JSON data + var podENIData []PodENIData + err := json.Unmarshal([]byte(val), &podENIData) + if err != nil || len(podENIData) < 1 { + log.Errorf("Failed to unmarshal PodENIData JSON: %v", err) + return &failureResponse, nil + } + firstENI := podENIData[0] + addr = firstENI.PrivateIP + branchENIMAC = firstENI.IfAddress + vlanId = firstENI.VlanID + if addr == "" || branchENIMAC == "" || vlanId == 0 { + log.Errorf("Failed to parse pod-ENI annotation: %s", val) + return &failureResponse, nil + } + currentGW := strings.Split(firstENI.SubnetCIDR, "/")[0] + // Increment value CIDR value + nextGWIP, err := networkutils.IncrementIPv4Addr(net.ParseIP(currentGW)) + if err != nil { + log.Errorf("Unable to get next Gateway IP for branch ENI from %s: %v", currentGW, err) + return &failureResponse, nil + } + podENISubnetGW = nextGWIP.String() + deviceNumber = -1 // Not needed for branch ENI, they depend on trunkENIDeviceIndex + } else { + log.Infof("Send AddNetworkReply: failed to get Branch ENI resource") + return &failureResponse, nil + } + } + } + } + if addr == "" { + ipamKey := datastore.IPAMKey{ + ContainerID: in.ContainerID, + IfName: in.IfName, + NetworkName: in.NetworkName, + } + addr, deviceNumber, err = s.ipamContext.dataStore.AssignPodIPv4Address(ipamKey) } - addr, deviceNumber, err := s.ipamContext.dataStore.AssignPodIPv4Address(ipamKey) - pbVPCcidrs := s.ipamContext.awsClient.GetVPCIPv4CIDRs() for _, cidr := range pbVPCcidrs { log.Debugf("VPC CIDR %s", cidr) @@ -72,17 +146,44 @@ func (s *server) AddNetwork(ctx context.Context, in *rpc.AddNetworkRequest) (*rp DeviceNumber: int32(deviceNumber), UseExternalSNAT: useExternalSNAT, VPCcidrs: pbVPCcidrs, + PodVlanId: int32(vlanId), + PodENIMAC: branchENIMAC, + PodENISubnetGW: podENISubnetGW, + ParentIfIndex: int32(trunkENILinkIndex), } log.Infof("Send AddNetworkReply: IPv4Addr %s, DeviceNumber: %d, err: %v", addr, deviceNumber, err) - addIPCnt.Inc() return &resp, nil } func (s *server) DelNetwork(ctx context.Context, in *rpc.DelNetworkRequest) (*rpc.DelNetworkReply, error) { log.Infof("Received DelNetwork for Sandbox %s", in.ContainerID) delIPCnt.With(prometheus.Labels{"reason": in.Reason}).Inc() - + if s.ipamContext.enablePodENI { + pod, err := s.ipamContext.GetPod(in.K8S_POD_NAME, in.K8S_POD_NAMESPACE) + if err != nil { + if k8serror.IsNotFound(err) { + log.Warn("Send AddNetworkReply: pod not found") + return &rpc.DelNetworkReply{Success: true}, nil + } + log.Warnf("Send DelNetworkReply: Failed to get pod spec: %v", err) + return &rpc.DelNetworkReply{Success: false}, err + } else { + val, branch := pod.Annotations["vpc.amazonaws.com/pod-eni"] + if branch { + // Parse JSON data + var podENIData []PodENIData + err := json.Unmarshal([]byte(val), &podENIData) + if err != nil || len(podENIData) < 1 { + log.Errorf("Failed to unmarshal PodENIData JSON: %v", err) + } + return &rpc.DelNetworkReply{ + Success: true, + PodVlanId: int32(podENIData[0].VlanID), + IPv4Addr: podENIData[0].PrivateIP}, err + } + } + } ipamKey := datastore.IPAMKey{ ContainerID: in.ContainerID, IfName: in.IfName, diff --git a/pkg/k8sapi/discovery.go b/pkg/k8sapi/discovery.go index 83e9c861f8..7b016771a0 100644 --- a/pkg/k8sapi/discovery.go +++ b/pkg/k8sapi/discovery.go @@ -34,6 +34,12 @@ const ( cniPodName = "aws-node" ) +// K8SAPIs defines interface to use kubelet introspection API +type K8SAPIs interface { + SetNodeLabel(key, value string) error + GetPod(podName, namespace string) (*v1.Pod, error) +} + // K8SPodInfo provides pod info type K8SPodInfo struct { // Name is pod's name @@ -105,6 +111,9 @@ func (d *Controller) GetCNIPods() []string { return cniPods } +// TODO: Add DiscoverNodeLabels() watcher function. +// TODO: Or is the right solution to look for a ConfigMap? + // DiscoverCNIK8SPods discovers CNI pods, aws-node, running in the cluster func (d *Controller) DiscoverCNIK8SPods() { // create the pod watcher @@ -154,6 +163,43 @@ func (d *Controller) DiscoverK8SPods(podListWatcher *cache.ListWatch) { select {} } +func (d *Controller) SetNodeLabel(key, value string) error { + // Find my node + node, err := d.kubeClient.CoreV1().Nodes().Get(d.myNodeName, metav1.GetOptions{}) + if err != nil { + log.Errorf("Failed to get node: %v", err) + return err + } + + if labelValue, ok := node.Labels[key]; ok && labelValue == value { + log.Debugf("Node label %q is already %q", key, labelValue) + return nil + } + // Make deep copy for modification + updateNode := node.DeepCopy() + + // Set node label + if value != "" { + updateNode.Labels[key] = value + } else { + // Empty value, delete the label + log.Debugf("Deleting label %q", key) + delete(updateNode.Labels, key) + } + + // Update node status to advertise the resource. + _, err = d.kubeClient.CoreV1().Nodes().Update(updateNode) + if err != nil { + log.Errorf("Failed to update node %s with label %q: %q, error: %v", d.myNodeName, key, value, err) + } + log.Infof("Updated node %s with label %q: %q", d.myNodeName, key, value) + return nil +} + +func (d *Controller) GetPod(podName, namespace string) (*v1.Pod, error) { + return d.kubeClient.CoreV1().Pods(namespace).Get(podName, metav1.GetOptions{}) +} + // The rest of logic/code are taken from kubernetes/client-go/examples/workqueue func newController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *controller { return &controller{ diff --git a/pkg/k8sapi/mocks/k8sapi_mocks.go b/pkg/k8sapi/mocks/k8sapi_mocks.go new file mode 100644 index 0000000000..cb4ba85a6a --- /dev/null +++ b/pkg/k8sapi/mocks/k8sapi_mocks.go @@ -0,0 +1,94 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi (interfaces: K8SAPIs) + +// Package mock_k8sapi is a generated GoMock package. +package mock_k8sapi + +import ( + reflect "reflect" + + k8sapi "github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi" + gomock "github.com/golang/mock/gomock" + v1 "k8s.io/api/core/v1" +) + +// MockK8SAPIs is a mock of K8SAPIs interface +type MockK8SAPIs struct { + ctrl *gomock.Controller + recorder *MockK8SAPIsMockRecorder +} + +// MockK8SAPIsMockRecorder is the mock recorder for MockK8SAPIs +type MockK8SAPIsMockRecorder struct { + mock *MockK8SAPIs +} + +// NewMockK8SAPIs creates a new mock instance +func NewMockK8SAPIs(ctrl *gomock.Controller) *MockK8SAPIs { + mock := &MockK8SAPIs{ctrl: ctrl} + mock.recorder = &MockK8SAPIsMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockK8SAPIs) EXPECT() *MockK8SAPIsMockRecorder { + return m.recorder +} + +// GetPod mocks base method +func (m *MockK8SAPIs) GetPod(arg0, arg1 string) (*v1.Pod, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPod", arg0, arg1) + ret0, _ := ret[0].(*v1.Pod) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetPod indicates an expected call of GetPod +func (mr *MockK8SAPIsMockRecorder) GetPod(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPod", reflect.TypeOf((*MockK8SAPIs)(nil).GetPod), arg0, arg1) +} + +// K8SGetLocalPodIPs mocks base method +func (m *MockK8SAPIs) K8SGetLocalPodIPs() ([]*k8sapi.K8SPodInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "K8SGetLocalPodIPs") + ret0, _ := ret[0].([]*k8sapi.K8SPodInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// K8SGetLocalPodIPs indicates an expected call of K8SGetLocalPodIPs +func (mr *MockK8SAPIsMockRecorder) K8SGetLocalPodIPs() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "K8SGetLocalPodIPs", reflect.TypeOf((*MockK8SAPIs)(nil).K8SGetLocalPodIPs)) +} + +// SetNodeLabel mocks base method +func (m *MockK8SAPIs) SetNodeLabel(arg0, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetNodeLabel", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetNodeLabel indicates an expected call of SetNodeLabel +func (mr *MockK8SAPIsMockRecorder) SetNodeLabel(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetNodeLabel", reflect.TypeOf((*MockK8SAPIs)(nil).SetNodeLabel), arg0, arg1) +} diff --git a/pkg/networkutils/mocks/network_mocks.go b/pkg/networkutils/mocks/network_mocks.go index fb4215ba14..68cc475659 100644 --- a/pkg/networkutils/mocks/network_mocks.go +++ b/pkg/networkutils/mocks/network_mocks.go @@ -21,6 +21,7 @@ package mock_networkutils import ( net "net" reflect "reflect" + "time" gomock "github.com/golang/mock/gomock" netlink "github.com/vishvananda/netlink" @@ -122,17 +123,17 @@ func (mr *MockNetworkAPIsMockRecorder) SetupENINetwork(arg0, arg1, arg2, arg3 in } // SetupHostNetwork mocks base method -func (m *MockNetworkAPIs) SetupHostNetwork(arg0 []string, arg1 string, arg2 *net.IP) error { +func (m *MockNetworkAPIs) SetupHostNetwork(arg0 []string, arg1 string, arg2 *net.IP, arg3 bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SetupHostNetwork", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "SetupHostNetwork", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) return ret0 } // SetupHostNetwork indicates an expected call of SetupHostNetwork -func (mr *MockNetworkAPIsMockRecorder) SetupHostNetwork(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockNetworkAPIsMockRecorder) SetupHostNetwork(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetupHostNetwork", reflect.TypeOf((*MockNetworkAPIs)(nil).SetupHostNetwork), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetupHostNetwork", reflect.TypeOf((*MockNetworkAPIs)(nil).SetupHostNetwork), arg0, arg1, arg2, arg3) } // UpdateRuleListBySrc mocks base method @@ -162,3 +163,18 @@ func (mr *MockNetworkAPIsMockRecorder) UseExternalSNAT() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UseExternalSNAT", reflect.TypeOf((*MockNetworkAPIs)(nil).UseExternalSNAT)) } + +// GetLinkByMac mocks base method +func (m *MockNetworkAPIs) GetLinkByMac(arg0 string, arg1 time.Duration) (netlink.Link, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetLinkByMac", arg0, arg1) + ret0, _ := ret[0].(netlink.Link) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetLinkByMac indicates an expected call of GetLinkByMac +func (mr *MockNetworkAPIsMockRecorder) GetLinkByMac(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLinkByMac", reflect.TypeOf((*MockNetworkAPIs)(nil).GetLinkByMac), arg0, arg1) +} diff --git a/pkg/networkutils/network.go b/pkg/networkutils/network.go index 7c854b1a23..fa3df76f9f 100644 --- a/pkg/networkutils/network.go +++ b/pkg/networkutils/network.go @@ -42,6 +42,9 @@ import ( ) const ( + // Local rule, needs to come after the pod ENI rules + localRulePriority = 2 + // 513 - 1023, can be used priority lower than toPodRulePriority but higher than default nonVPC CIDR rule // 1024 is reserved for (ip rule not to table main) @@ -50,8 +53,12 @@ const ( // 1025 - 1535 can be used priority lower than fromPodRulePriority but higher than default nonVPC CIDR rule fromPodRulePriority = 1536 + // Main route table mainRoutingTable = unix.RT_TABLE_MAIN + // Local route table + localRouteTable = unix.RT_TABLE_LOCAL + // This environment is used to specify whether an external NAT gateway will be used to provide SNAT of // secondary ENI IP addresses. If set to "true", the SNAT iptables rule and off-VPC ip rule will not // be installed and will be removed if they are already installed. Defaults to false. @@ -113,7 +120,7 @@ var log = logger.Get() // NetworkAPIs defines the host level and the ENI level network related operations type NetworkAPIs interface { // SetupNodeNetwork performs node level network configuration - SetupHostNetwork(vpcCIDRs []string, primaryMAC string, primaryAddr *net.IP) error + SetupHostNetwork(vpcCIDRs []string, primaryMAC string, primaryAddr *net.IP, enablePodENI bool) error // SetupENINetwork performs eni level network configuration SetupENINetwork(eniIP string, mac string, table int, subnetCIDR string) error UseExternalSNAT() bool @@ -122,6 +129,7 @@ type NetworkAPIs interface { GetRuleListBySrc(ruleList []netlink.Rule, src net.IPNet) ([]netlink.Rule, error) UpdateRuleListBySrc(ruleList []netlink.Rule, src net.IPNet, toCIDRs []string, toFlag bool) error DeleteRuleListBySrc(src net.IPNet) error + GetLinkByMac(mac string, retryInterval time.Duration) (netlink.Link, error) } type linuxNetwork struct { @@ -205,7 +213,7 @@ func findPrimaryInterfaceName(primaryMAC string) (string, error) { } // SetupHostNetwork performs node level network configuration -func (n *linuxNetwork) SetupHostNetwork(vpcCIDRs []string, primaryMAC string, primaryAddr *net.IP) error { +func (n *linuxNetwork) SetupHostNetwork(vpcCIDRs []string, primaryMAC string, primaryAddr *net.IP, enablePodENI bool) error { log.Info("Setting up host network... ") var err error @@ -238,7 +246,7 @@ func (n *linuxNetwork) SetupHostNetwork(vpcCIDRs []string, primaryMAC string, pr } } - link, err := LinkByMac(primaryMAC, n.netLink, retryLinkByMacInterval) + link, err := linkByMac(primaryMAC, n.netLink, retryLinkByMacInterval) if err != nil { return errors.Wrapf(err, "setupHostNetwork: failed to find the link primary ENI with MAC address %s", primaryMAC) } @@ -271,6 +279,25 @@ func (n *linuxNetwork) SetupHostNetwork(vpcCIDRs []string, primaryMAC string, pr } } + // If we want per pod ENIs, we need to give pod ENIs veth bridges a lower priority that the local table, + // or the rp_filter check will fail. + if enablePodENI { + localRule := n.netLink.NewRule() + localRule.Table = localRouteTable + localRule.Priority = localRulePriority + // Add new rule with higher priority + err := n.netLink.RuleAdd(localRule) + if err != nil && !isRuleExistsError(err) { + return errors.Wrap(err, "ChangeLocalRulePriority: unable to update local rule priority") + } + // Delete the priority 0 rule + localRule.Priority = 0 + err = n.netLink.RuleDel(localRule) + if err != nil && !containsNoSuchRule(err) { + return errors.Wrap(err, "ChangeLocalRulePriority: failed to delete priority 0 local rule") + } + } + ipt, err := n.newIptables() if err != nil { return errors.Wrap(err, "host network setup: failed to create iptables") @@ -420,6 +447,17 @@ func (n *linuxNetwork) SetupHostNetwork(vpcCIDRs []string, primaryMAC string, pr }, }) + iptableRules = append(iptableRules, iptablesRule{ + name: "connmark restore for primary ENI from vlan", + shouldExist: n.nodePortSupportEnabled, + table: "mangle", + chain: "PREROUTING", + rule: []string{ + "-m", "comment", "--comment", "AWS, primary ENI", + "-i", "vlan+", "-j", "CONNMARK", "--restore-mark", "--mask", fmt.Sprintf("%#x", n.mainENIMark), + }, + }) + for _, rule := range iptableRules { log.Debugf("execute iptable rule : %s", rule.name) @@ -504,6 +542,13 @@ func containsNoSuchRule(err error) bool { return false } +func isRuleExistsError(err error) bool { + if errno, ok := err.(syscall.Errno); ok { + return errno == syscall.EEXIST + } + return false +} + // GetConfigForDebug returns the active values of the configuration env vars (for debugging purposes). func GetConfigForDebug() map[string]interface{} { return map[string]interface{}{ @@ -615,8 +660,13 @@ func getConnmark() uint32 { return defaultConnmark } -// LinkByMac returns linux netlink based on interface MAC -func LinkByMac(mac string, netLink netlinkwrapper.NetLink, retryInterval time.Duration) (netlink.Link, error) { +// GetLinkByMac returns linux netlink based on interface MAC +func (n *linuxNetwork) GetLinkByMac(mac string, retryInterval time.Duration) (netlink.Link, error) { + return linkByMac(mac, n.netLink, retryInterval) +} + +// linkByMac returns linux netlink based on interface MAC +func linkByMac(mac string, netLink netlinkwrapper.NetLink, retryInterval time.Duration) (netlink.Link, error) { // The adapter might not be immediately available, so we perform retries var lastErr error attempt := 0 @@ -629,7 +679,6 @@ func LinkByMac(mac string, netLink netlinkwrapper.NetLink, retryInterval time.Du } links, err := netLink.LinkList() - if err != nil { lastErr = errors.Errorf("%s (attempt %d/%d)", err, attempt, maxAttemptsLinkByMac) log.Debugf(lastErr.Error()) @@ -664,7 +713,7 @@ func setupENINetwork(eniIP string, eniMAC string, eniTable int, eniSubnetCIDR st log.Infof("Setting up network for an ENI with IP address %s, MAC address %s, CIDR %s and route table %d", eniIP, eniMAC, eniSubnetCIDR, eniTable) - link, err := LinkByMac(eniMAC, netLink, retryLinkByMacInterval) + link, err := linkByMac(eniMAC, netLink, retryLinkByMacInterval) if err != nil { return errors.Wrapf(err, "setupENINetwork: failed to find the link which uses MAC address %s", eniMAC) } @@ -684,7 +733,7 @@ func setupENINetwork(eniIP string, eniMAC string, eniTable int, eniSubnetCIDR st return errors.Wrapf(err, "setupENINetwork: invalid IPv4 CIDR block %s", eniSubnetCIDR) } - gw, err := incrementIPv4Addr(ipnet.IP) + gw, err := IncrementIPv4Addr(ipnet.IP) if err != nil { return errors.Wrapf(err, "setupENINetwork: failed to define gateway address from %v", ipnet.IP) } @@ -774,8 +823,8 @@ func setupENINetwork(eniIP string, eniMAC string, eniTable int, eniSubnetCIDR st return nil } -// incrementIPv4Addr returns incremented IPv4 address -func incrementIPv4Addr(ip net.IP) (net.IP, error) { +// IncrementIPv4Addr returns incremented IPv4 address +func IncrementIPv4Addr(ip net.IP) (net.IP, error) { ip4 := ip.To4() if ip4 == nil { return nil, fmt.Errorf("%q is not a valid IPv4 Address", ip) diff --git a/pkg/networkutils/network_test.go b/pkg/networkutils/network_test.go index de69413602..cd4e9e094e 100644 --- a/pkg/networkutils/network_test.go +++ b/pkg/networkutils/network_test.go @@ -86,10 +86,10 @@ func TestSetupENINetwork(t *testing.T) { eth1 := mock_netlink.NewMockLink(ctrl) // Emulate a delay attaching the ENI so a retry is necessary // First attempt gets one links - firstlistSet := mockNetLink.EXPECT().LinkList().Return([]netlink.Link([]netlink.Link{lo}), nil) + firstlistSet := mockNetLink.EXPECT().LinkList().Return([]netlink.Link{lo}, nil) lo.EXPECT().Attrs().Return(mockLinkAttrs1) // Second attempt gets both links - secondlistSet := mockNetLink.EXPECT().LinkList().Return([]netlink.Link([]netlink.Link{lo, eth1}), nil) + secondlistSet := mockNetLink.EXPECT().LinkList().Return([]netlink.Link{lo, eth1}, nil) lo.EXPECT().Attrs().Return(mockLinkAttrs1) eth1.EXPECT().Attrs().Return(mockLinkAttrs2) gomock.InOrder(firstlistSet, secondlistSet) @@ -161,7 +161,7 @@ func TestSetupHostNetworkNodePortDisabled(t *testing.T) { mockNetLink.EXPECT().RuleDel(&mainENIRule) var vpcCIDRs []string - err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP) + err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP, false) assert.NoError(t, err) } @@ -287,7 +287,7 @@ func TestSetupHostNetworkNodePortEnabled(t *testing.T) { var vpcCIDRs []string - err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP) + err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP, false) assert.NoError(t, err) assert.Equal(t, map[string]map[string][][]string{ @@ -360,7 +360,7 @@ func TestSetupHostNetworkWithExcludeSNATCIDRs(t *testing.T) { mockProcSys.EXPECT().Set("net/ipv4/conf/lo/rp_filter", "2").Return(nil) vpcCIDRs := []string{"10.10.0.0/16", "10.11.0.0/16"} - err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP) + err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP, false) assert.NoError(t, err) assert.Equal(t, map[string]map[string][][]string{ @@ -413,7 +413,7 @@ func TestSetupHostNetworkCleansUpStaleSNATRules(t *testing.T) { _ = mockIptables.Append("nat", "POSTROUTING", "-m", "comment", "--comment", "AWS SNAT CHAIN", "-j", "AWS-SNAT-CHAIN-0") vpcCIDRs := []string{"10.10.0.0/16", "10.11.0.0/16"} - err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP) + err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP, false) assert.NoError(t, err) assert.Equal(t, @@ -467,7 +467,7 @@ func TestSetupHostNetworkExcludedSNATCIDRsIdempotent(t *testing.T) { // remove exclusions vpcCIDRs := []string{"10.10.0.0/16", "10.11.0.0/16"} - err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP) + err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP, false) assert.NoError(t, err) assert.Equal(t, @@ -512,7 +512,7 @@ func TestSetupHostNetworkMultipleCIDRs(t *testing.T) { mockProcSys.EXPECT().Set("net/ipv4/conf/lo/rp_filter", "2").Return(nil) vpcCIDRs := []string{"10.10.0.0/16", "10.11.0.0/16"} - err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP) + err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP, false) assert.NoError(t, err) } @@ -531,7 +531,7 @@ func TestIncrementIPv4Addr(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - result, err := incrementIPv4Addr(tc.ip) + result, err := IncrementIPv4Addr(tc.ip) if tc.err { assert.Error(t, err) } else { @@ -563,10 +563,36 @@ func TestSetupHostNetworkIgnoringRpFilterUpdate(t *testing.T) { setupNetLinkMocks(ctrl, mockNetLink) var vpcCIDRs []string - err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP) + err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP, false) assert.NoError(t, err) } +//// TODO: fix test +//func TestSetupHostNetworkUpdateLocalRule(t *testing.T) { +// ctrl, mockNetLink, _, mockNS, mockIptables, mockProcSys := setup(t) +// defer ctrl.Finish() +// +// ln := &linuxNetwork{ +// useExternalSNAT: true, +// nodePortSupportEnabled: true, +// shouldConfigureRpFilter: true, +// mainENIMark: defaultConnmark, +// mtu: testMTU, +// +// netLink: mockNetLink, +// ns: mockNS, +// newIptables: func() (iptablesIface, error) { +// return mockIptables, nil +// }, +// procSys: mockProcSys, +// } +// setupNetLinkMocks(ctrl, mockNetLink) +// // TODO +// var vpcCIDRs []*string +// err := ln.SetupHostNetwork(testENINetIPNet, vpcCIDRs, loopback, &testENINetIP, true) +// assert.NoError(t, err) +//} + func setupNetLinkMocks(ctrl *gomock.Controller, mockNetLink *mock_netlinkwrapper.MockNetLink) { mockPrimaryInterfaceLookup(ctrl, mockNetLink) mockNetLink.EXPECT().LinkSetMTU(gomock.Any(), testMTU).Return(nil)