Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add vlan support to ipamd
Browse files Browse the repository at this point in the history
Claes Mogren authored and mogren committed Aug 11, 2020
1 parent ad4802b commit 9073605
Showing 22 changed files with 713 additions and 102 deletions.
5 changes: 3 additions & 2 deletions cmd/cni-metrics-helper/README.md
Original file line number Diff line number Diff line change
@@ -15,10 +15,10 @@ The following diagram shows how `cni-metrics-helper` works in a cluster:

### Installing the cni-metrics-helper
```
kubectl apply -f v1.5/cni-metrics-helper.yaml
kubectl apply -f v1.6/cni-metrics-helper.yaml
```

Adding this will publish the following metrics to CloudWatch:
Adding the CNI metrics helper will publish the following metrics to CloudWatch:
```
"addReqCount",
"assignIPAddresses",
@@ -31,6 +31,7 @@ Adding this will publish the following metrics to CloudWatch:
"ipamdActionInProgress",
"ipamdErr",
"maxIPAddresses",
"podENIErr",
"reconcileCount",
"totalIPAddresses",
```
7 changes: 7 additions & 0 deletions cmd/cni-metrics-helper/metrics/cni_metrics.go
Original file line number Diff line number Diff line change
@@ -128,6 +128,13 @@ var InterestingCNIMetrics = map[string]metricsConvert{
actionFunc: metricsAdd,
data: &dataPoints{},
logToFile: true}}},
"awscni_pod_eni_error_count": {
actions: []metricsAction{
{cwMetricName: "podENIErr",
matchFunc: matchAny,
actionFunc: metricsAdd,
data: &dataPoints{},
logToFile: true}}},
}

// CNIMetricsTarget defines data structure for kube-state-metric target
12 changes: 11 additions & 1 deletion config/master/aws-k8s-cni-cn.yaml
Original file line number Diff line number Diff line change
@@ -29,12 +29,20 @@
- ""
"resources":
- "pods"
- "nodes"
- "namespaces"
"verbs":
- "list"
- "watch"
- "get"
- "apiGroups":
- ""
"resources":
- "nodes"
"verbs":
- "list"
- "watch"
- "get"
- "update"
- "apiGroups":
- "extensions"
"resources":
@@ -135,6 +143,8 @@
"value": "false"
- "name": "DISABLE_METRICS"
"value": "false"
- "name": "ENABLE_POD_ENI"
"value": "false"
- "name": "MY_NODE_NAME"
"valueFrom":
"fieldRef":
12 changes: 11 additions & 1 deletion config/master/aws-k8s-cni-us-gov-east-1.yaml
Original file line number Diff line number Diff line change
@@ -29,12 +29,20 @@
- ""
"resources":
- "pods"
- "nodes"
- "namespaces"
"verbs":
- "list"
- "watch"
- "get"
- "apiGroups":
- ""
"resources":
- "nodes"
"verbs":
- "list"
- "watch"
- "get"
- "update"
- "apiGroups":
- "extensions"
"resources":
@@ -135,6 +143,8 @@
"value": "false"
- "name": "DISABLE_METRICS"
"value": "false"
- "name": "ENABLE_POD_ENI"
"value": "false"
- "name": "MY_NODE_NAME"
"valueFrom":
"fieldRef":
12 changes: 11 additions & 1 deletion config/master/aws-k8s-cni-us-gov-west-1.yaml
Original file line number Diff line number Diff line change
@@ -29,12 +29,20 @@
- ""
"resources":
- "pods"
- "nodes"
- "namespaces"
"verbs":
- "list"
- "watch"
- "get"
- "apiGroups":
- ""
"resources":
- "nodes"
"verbs":
- "list"
- "watch"
- "get"
- "update"
- "apiGroups":
- "extensions"
"resources":
@@ -135,6 +143,8 @@
"value": "false"
- "name": "DISABLE_METRICS"
"value": "false"
- "name": "ENABLE_POD_ENI"
"value": "false"
- "name": "MY_NODE_NAME"
"valueFrom":
"fieldRef":
12 changes: 11 additions & 1 deletion config/master/aws-k8s-cni.yaml
Original file line number Diff line number Diff line change
@@ -29,12 +29,20 @@
- ""
"resources":
- "pods"
- "nodes"
- "namespaces"
"verbs":
- "list"
- "watch"
- "get"
- "apiGroups":
- ""
"resources":
- "nodes"
"verbs":
- "list"
- "watch"
- "get"
- "update"
- "apiGroups":
- "extensions"
"resources":
@@ -135,6 +143,8 @@
"value": "false"
- "name": "DISABLE_METRICS"
"value": "false"
- "name": "ENABLE_POD_ENI"
"value": "false"
- "name": "MY_NODE_NAME"
"valueFrom":
"fieldRef":
20 changes: 13 additions & 7 deletions config/master/manifests.jsonnet
Original file line number Diff line number Diff line change
@@ -40,9 +40,14 @@ local awsnode = {
},
{
apiGroups: [""],
resources: ["pods", "nodes", "namespaces"],
resources: ["pods", "namespaces"],
verbs: ["list", "watch", "get"],
},
{
apiGroups: [""],
resources: ["nodes"],
verbs: ["list", "watch", "get", "update"],
},
{
apiGroups: ["extensions"],
resources: ["*"],
@@ -155,26 +160,27 @@ local awsnode = {
initialDelaySeconds: 60,
},
env_:: {
ADDITIONAL_ENI_TAGS: "{}",
AWS_VPC_CNI_NODE_PORT_SUPPORT: "true",
AWS_VPC_K8S_CNI_CUSTOM_NETWORK_CFG: "false",
AWS_VPC_ENI_MTU: "9001",
AWS_VPC_K8S_CNI_CONFIGURE_RPFILTER: "false",
AWS_VPC_K8S_CNI_CUSTOM_NETWORK_CFG: "false",
AWS_VPC_K8S_CNI_EXTERNALSNAT: "false",
AWS_VPC_K8S_CNI_RANDOMIZESNAT: "prng",
WARM_ENI_TARGET: "1",
AWS_VPC_K8S_CNI_LOGLEVEL: "DEBUG",
AWS_VPC_K8S_CNI_LOG_FILE: "/host/var/log/aws-routed-eni/ipamd.log",
AWS_VPC_K8S_CNI_RANDOMIZESNAT: "prng",
AWS_VPC_K8S_CNI_VETHPREFIX: "eni",
AWS_VPC_K8S_PLUGIN_LOG_FILE: "/var/log/aws-routed-eni/plugin.log",
AWS_VPC_K8S_PLUGIN_LOG_LEVEL: "DEBUG",
DISABLE_INTROSPECTION: "false",
DISABLE_METRICS: "false",
AWS_VPC_K8S_CNI_VETHPREFIX: "eni",
ADDITIONAL_ENI_TAGS: "{}",
AWS_VPC_K8S_CNI_CONFIGURE_RPFILTER: "false",
ENABLE_POD_ENI: "false",
MY_NODE_NAME: {
valueFrom: {
fieldRef: {fieldPath: "spec.nodeName"},
},
},
WARM_ENI_TARGET: "1",
},
env: [
{name: kv[0]} + if std.isObject(kv[1]) then kv[1] else {value: kv[1]}
18 changes: 12 additions & 6 deletions pkg/awsutils/awsutils.go
Original file line number Diff line number Diff line change
@@ -126,7 +126,7 @@ type APIs interface {
GetIPv4sFromEC2(eniID string) (addrList []*ec2.NetworkInterfacePrivateIpAddress, err error)

// DescribeAllENIs calls EC2 and returns the ENIMetadata and a tag map for each ENI
DescribeAllENIs() ([]ENIMetadata, map[string]TagMap, error)
DescribeAllENIs() (eniMetadata []ENIMetadata, tagMap map[string]TagMap, trunkENI string, err error)

// AllocIPAddress allocates an IP address for an ENI
AllocIPAddress(eniID string) error
@@ -729,6 +729,7 @@ func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string
} else {
log.Info("Using same config as the primary interface for the new ENI")
}

var sgs []string
for i := range input.Groups {
sgs = append(sgs, *input.Groups[i])
@@ -998,11 +999,11 @@ func (cache *EC2InstanceMetadataCache) GetIPv4sFromEC2(eniID string) (addrList [
}

// DescribeAllENIs calls EC2 to refrech the ENIMetadata and tags for all attached ENIs
func (cache *EC2InstanceMetadataCache) DescribeAllENIs() ([]ENIMetadata, map[string]TagMap, error) {
func (cache *EC2InstanceMetadataCache) DescribeAllENIs() (eniMetadata []ENIMetadata, tagMap map[string]TagMap, trunkENI string, err error) {
// Fetch all local ENI info from metadata
allENIs, err := cache.GetAttachedENIs()
if err != nil {
return nil, nil, errors.Wrap(err, "DescribeAllENIs: failed to get local ENI metadata")
return nil, nil, "", errors.Wrap(err, "DescribeAllENIs: failed to get local ENI metadata")
}

eniMap := make(map[string]ENIMetadata, len(allENIs))
@@ -1047,7 +1048,7 @@ func (cache *EC2InstanceMetadataCache) DescribeAllENIs() ([]ENIMetadata, map[str
}

if err != nil {
return nil, nil, err
return nil, nil, "", err
}

// Collect the verified ENIs
@@ -1057,13 +1058,18 @@ func (cache *EC2InstanceMetadataCache) DescribeAllENIs() ([]ENIMetadata, map[str
}

// Collect ENI response into ENI metadata and tags.
tagMap := make(map[string]TagMap, len(ec2Response.NetworkInterfaces))
tagMap = make(map[string]TagMap, len(ec2Response.NetworkInterfaces))
for _, ec2res := range ec2Response.NetworkInterfaces {
if ec2res.Attachment != nil && aws.Int64Value(ec2res.Attachment.DeviceIndex) == 0 && !aws.BoolValue(ec2res.Attachment.DeleteOnTermination) {
log.Warn("Primary ENI will not get deleted when node terminates because 'delete_on_termination' is set to false")
}
eniID := aws.StringValue(ec2res.NetworkInterfaceId)
eniMetadata := eniMap[eniID]
interfaceType := aws.StringValue(ec2res.InterfaceType)
// This assumes we only have one trunk attached to the node..
if interfaceType == "trunk" {
trunkENI = eniID
}
// Check IPv4 addresses
logOutOfSyncState(eniID, eniMetadata.IPv4Addresses, ec2res.PrivateIpAddresses)
tags := make(map[string]string, len(ec2res.TagSet))
@@ -1078,7 +1084,7 @@ func (cache *EC2InstanceMetadataCache) DescribeAllENIs() ([]ENIMetadata, map[str
tagMap[eniMetadata.ENIID] = tags
}
}
return verifiedENIs, tagMap, nil
return verifiedENIs, tagMap, trunkENI, nil
}

var eniErrorMessageRegex = regexp.MustCompile("'([a-zA-Z0-9-]+)'")
2 changes: 1 addition & 1 deletion pkg/awsutils/awsutils_test.go
Original file line number Diff line number Diff line change
@@ -393,7 +393,7 @@ func TestDescribeAllENIs(t *testing.T) {
for _, tc := range testCases {
mockEC2.EXPECT().DescribeNetworkInterfacesWithContext(gomock.Any(), gomock.Any(), gomock.Any()).Times(tc.n).Return(result, tc.awsErr)
ins := &EC2InstanceMetadataCache{ec2Metadata: mockMetadata, ec2SVC: mockEC2}
_, tags, err := ins.DescribeAllENIs()
_, tags, _, err := ins.DescribeAllENIs()
assert.Equal(t, tc.expErr, err, tc.name)
assert.Equal(t, tc.exptags, tags, tc.name)
}
7 changes: 4 additions & 3 deletions pkg/awsutils/mocks/awsutils_mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion pkg/eniconfig/eniconfig.go
Original file line number Diff line number Diff line change
@@ -126,12 +126,17 @@ func (h *Handler) Handle(ctx context.Context, event sdk.Event) error {
val = eniConfigDefault
}
}

// If value changes
if h.controller.myENI != val {
h.controller.eniLock.Lock()
defer h.controller.eniLock.Unlock()
h.controller.myENI = val
log.Debugf("Setting myENI to: %s", val)
if val != eniConfigDefault {
labels := o.GetLabels()
labels["vpc.amazonaws.com/eniConfig"] = val
o.SetLabels(labels)
}
}
}
}
3 changes: 2 additions & 1 deletion pkg/eniconfig/eniconfig_test.go
Original file line number Diff line number Diff line change
@@ -46,7 +46,8 @@ func updateNodeAnnotation(hdlr sdk.Handler, nodeName string, configName string,
node := corev1.Node{
TypeMeta: metav1.TypeMeta{APIVersion: corev1.SchemeGroupVersion.String()},
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
Name: nodeName,
Labels: make(map[string]string),
},
}
accessor, err := meta.Accessor(&node)
23 changes: 21 additions & 2 deletions pkg/ipamd/datastore/data_store.go
Original file line number Diff line number Diff line change
@@ -115,13 +115,15 @@ func (k IPAMKey) String() string {
return fmt.Sprintf("%s/%s/%s", k.NetworkName, k.ContainerID, k.IfName)
}

// ENI represents a single ENI.
// ENI represents a single ENI. Exported fields will be marshaled for introspection.
type ENI struct {
// AWS ENI ID
ID string
createTime time.Time
// IsPrimary indicates whether ENI is a primary ENI
IsPrimary bool
// IsTrunk indicates whether this ENI is used to provide pods with dedicated ENIs
IsTrunk bool
// DeviceNumber is the device number of ENI (0 means the primary ENI)
DeviceNumber int
// IPv4Addresses shows whether each address is assigned, the key is IP address, which must
@@ -355,7 +357,7 @@ func (ds *DataStore) writeBackingStoreUnsafe() error {
}

// AddENI add ENI to data store
func (ds *DataStore) AddENI(eniID string, deviceNumber int, isPrimary bool) error {
func (ds *DataStore) AddENI(eniID string, deviceNumber int, isPrimary, isTrunk bool) error {
ds.lock.Lock()
defer ds.lock.Unlock()

@@ -368,6 +370,7 @@ func (ds *DataStore) AddENI(eniID string, deviceNumber int, isPrimary bool) erro
ds.eniPool[eniID] = &ENI{
createTime: time.Now(),
IsPrimary: isPrimary,
IsTrunk: isTrunk,
ID: eniID,
DeviceNumber: deviceNumber,
IPv4Addresses: make(map[string]*AddressInfo)}
@@ -505,6 +508,17 @@ func (ds *DataStore) GetStats() (int, int) {
return ds.total, ds.assigned
}

func (ds *DataStore) GetTrunkENI() string {
ds.lock.Lock()
defer ds.lock.Unlock()
for _, eni := range ds.eniPool {
if eni.IsTrunk {
return eni.ID
}
}
return ""
}

// IsRequiredForWarmIPTarget determines if this ENI has warm IPs that are required to fulfill whatever WARM_IP_TARGET is
// set to.
func (ds *DataStore) isRequiredForWarmIPTarget(warmIPTarget int, eni *ENI) bool {
@@ -561,6 +575,11 @@ func (ds *DataStore) getDeletableENI(warmIPTarget int, minimumIPTarget int) *ENI
continue
}

if eni.IsTrunk {
ds.log.Debugf("ENI %s cannot be deleted because it is a trunk ENI", eni.ID)
continue
}

ds.log.Debugf("getDeletableENI: found a deletable ENI %s", eni.ID)
return eni
}
53 changes: 29 additions & 24 deletions pkg/ipamd/datastore/data_store_test.go
Original file line number Diff line number Diff line change
@@ -34,13 +34,13 @@ var log = logger.New(&logConfig)
func TestAddENI(t *testing.T) {
ds := NewDataStore(log, NullCheckpoint{})

err := ds.AddENI("eni-1", 1, true)
err := ds.AddENI("eni-1", 1, true, false)
assert.NoError(t, err)

err = ds.AddENI("eni-1", 1, true)
err = ds.AddENI("eni-1", 1, true, false)
assert.Error(t, err)

err = ds.AddENI("eni-2", 2, false)
err = ds.AddENI("eni-2", 2, false, false)
assert.NoError(t, err)

assert.Equal(t, len(ds.eniPool), 2)
@@ -52,13 +52,13 @@ func TestAddENI(t *testing.T) {
func TestDeleteENI(t *testing.T) {
ds := NewDataStore(log, NullCheckpoint{})

err := ds.AddENI("eni-1", 1, true)
err := ds.AddENI("eni-1", 1, true, false)
assert.NoError(t, err)

err = ds.AddENI("eni-2", 2, false)
err = ds.AddENI("eni-2", 2, false, false)
assert.NoError(t, err)

err = ds.AddENI("eni-3", 3, false)
err = ds.AddENI("eni-3", 3, false, false)
assert.NoError(t, err)

eniInfos := ds.GetENIInfos()
@@ -95,10 +95,10 @@ func TestDeleteENI(t *testing.T) {
func TestAddENIIPv4Address(t *testing.T) {
ds := NewDataStore(log, NullCheckpoint{})

err := ds.AddENI("eni-1", 1, true)
err := ds.AddENI("eni-1", 1, true, false)
assert.NoError(t, err)

err = ds.AddENI("eni-2", 2, false)
err = ds.AddENI("eni-2", 2, false, false)
assert.NoError(t, err)

err = ds.AddIPv4AddressToStore("eni-1", "1.1.1.1")
@@ -133,10 +133,10 @@ func TestAddENIIPv4Address(t *testing.T) {
func TestGetENIIPs(t *testing.T) {
ds := NewDataStore(log, NullCheckpoint{})

err := ds.AddENI("eni-1", 1, true)
err := ds.AddENI("eni-1", 1, true, false)
assert.NoError(t, err)

err = ds.AddENI("eni-2", 2, false)
err = ds.AddENI("eni-2", 2, false, false)
assert.NoError(t, err)

err = ds.AddIPv4AddressToStore("eni-1", "1.1.1.1")
@@ -165,7 +165,7 @@ func TestGetENIIPs(t *testing.T) {

func TestDelENIIPv4Address(t *testing.T) {
ds := NewDataStore(log, NullCheckpoint{})
err := ds.AddENI("eni-1", 1, true)
err := ds.AddENI("eni-1", 1, true, false)
assert.NoError(t, err)

err = ds.AddIPv4AddressToStore("eni-1", "1.1.1.1")
@@ -218,11 +218,14 @@ func TestPodIPv4Address(t *testing.T) {
checkpoint := NewTestCheckpoint(struct{}{})
ds := NewDataStore(log, checkpoint)

ds.AddENI("eni-1", 1, true)
err := ds.AddENI("eni-1", 1, true, false)
assert.NoError(t, err)

ds.AddENI("eni-2", 2, false)
err = ds.AddENI("eni-2", 2, false, false)
assert.NoError(t, err)

ds.AddIPv4AddressToStore("eni-1", "1.1.1.1")
err = ds.AddIPv4AddressToStore("eni-1", "1.1.1.1")
assert.NoError(t, err)

key1 := IPAMKey{"net0", "sandbox-1", "eth0"}
ip, _, err := ds.AssignPodIPv4Address(key1)
@@ -242,7 +245,8 @@ func TestPodIPv4Address(t *testing.T) {
podsInfos := ds.AllocatedIPs()
assert.Equal(t, len(podsInfos), 1)

ds.AddIPv4AddressToStore("eni-2", "1.1.2.2")
err = ds.AddIPv4AddressToStore("eni-2", "1.1.2.2")
assert.NoError(t, err)

// duplicate add
ip, _, err = ds.AssignPodIPv4Address(key1) // same id
@@ -280,7 +284,8 @@ func TestPodIPv4Address(t *testing.T) {
podsInfos = ds.AllocatedIPs()
assert.Equal(t, len(podsInfos), 2)

ds.AddIPv4AddressToStore("eni-1", "1.1.1.2")
err = ds.AddIPv4AddressToStore("eni-1", "1.1.1.2")
assert.NoError(t, err)

key3 := IPAMKey{"net0", "sandbox-3", "eth0"}
ip, _, err = ds.AssignPodIPv4Address(key3)
@@ -328,23 +333,23 @@ func TestPodIPv4Address(t *testing.T) {
func TestWarmENIInteractions(t *testing.T) {
ds := NewDataStore(log, NullCheckpoint{})

ds.AddENI("eni-1", 1, true)
ds.AddENI("eni-2", 2, false)
ds.AddENI("eni-3", 3, false)
_ = ds.AddENI("eni-1", 1, true, false)
_ = ds.AddENI("eni-2", 2, false, false)
_ = ds.AddENI("eni-3", 3, false, false)

ds.AddIPv4AddressToStore("eni-1", "1.1.1.1")
_ = ds.AddIPv4AddressToStore("eni-1", "1.1.1.1")
key1 := IPAMKey{"net0", "sandbox-1", "eth0"}
_, _, err := ds.AssignPodIPv4Address(key1)
assert.NoError(t, err)

ds.AddIPv4AddressToStore("eni-1", "1.1.1.2")
_ = ds.AddIPv4AddressToStore("eni-1", "1.1.1.2")
key2 := IPAMKey{"net0", "sandbox-2", "eth0"}
_, _, err = ds.AssignPodIPv4Address(key2)
assert.NoError(t, err)

ds.AddIPv4AddressToStore("eni-2", "1.1.2.1")
ds.AddIPv4AddressToStore("eni-2", "1.1.2.2")
ds.AddIPv4AddressToStore("eni-3", "1.1.3.1")
_ = ds.AddIPv4AddressToStore("eni-2", "1.1.2.1")
_ = ds.AddIPv4AddressToStore("eni-2", "1.1.2.2")
_ = ds.AddIPv4AddressToStore("eni-3", "1.1.3.1")

noWarmIPTarget := 0

178 changes: 164 additions & 14 deletions pkg/ipamd/ipamd.go
Original file line number Diff line number Diff line change
@@ -24,6 +24,9 @@ import (
"sync/atomic"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/aws/amazon-vpc-cni-k8s/pkg/awsutils"
"github.com/aws/amazon-vpc-cni-k8s/pkg/eniconfig"
"github.com/aws/amazon-vpc-cni-k8s/pkg/ipamd/datastore"
@@ -114,6 +117,12 @@ const (
// Specify where ipam should persist its current IP<->container allocations.
envBackingStorePath = "AWS_VPC_K8S_CNI_BACKING_STORE"
defaultBackingStorePath = "/var/run/aws-node/ipam.json"

// envEnablePodENI is used to attach a Trunk ENI to every node. Required in order to give Branch ENIs to pods.
envEnablePodENI = "ENABLE_POD_ENI"

// vpcENIConfigLabel is used by the VPC resource controller to pick the right ENI config.
vpcENIConfigLabel = "vpc.amazonaws.com/eniConfig"
)

var log = logger.Get()
@@ -165,6 +174,13 @@ var (
},
[]string{"reason"},
)
podENIErr = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "awscni_pod_eni_error_count",
Help: "The number of errors encountered for pod ENIs",
},
[]string{"fn"},
)
prometheusRegistered = false
)

@@ -191,6 +207,8 @@ type IPAMContext struct {
reconcileCooldownCache ReconcileCooldownCache
terminating int32 // Flag to warn that the pod is about to shut down.
disableENIProvisioning bool
enablePodENI bool
myNodeName string
}

// UnmanagedENISet keeps a set of ENI IDs for ENIs tagged with "node.k8s.amazonaws.com/no_manage"
@@ -282,6 +300,7 @@ func prometheusRegister() {
prometheus.MustRegister(reconcileCnt)
prometheus.MustRegister(addIPCnt)
prometheus.MustRegister(delIPCnt)
prometheus.MustRegister(podENIErr)
prometheusRegistered = true
}
}
@@ -309,6 +328,8 @@ func New(k8sapiClient kubernetes.Interface, eniConfig *eniconfig.ENIConfigContro
c.minimumIPTarget = getMinimumIPTarget()
c.useCustomNetworking = UseCustomNetworkCfg()
c.disableENIProvisioning = disablingENIProvisioning()
c.enablePodENI = enablePodENI()
c.myNodeName = os.Getenv("MY_NODE_NAME")

checkpointer := datastore.NewJSONFile(dsBackingStorePath())
c.dataStore = datastore.NewDataStore(log, checkpointer)
@@ -340,12 +361,12 @@ func (c *IPAMContext) nodeInit() error {

vpcCIDRs := c.awsClient.GetVPCIPv4CIDRs()
primaryIP := net.ParseIP(c.awsClient.GetLocalIPv4())
err = c.networkClient.SetupHostNetwork(vpcCIDRs, c.awsClient.GetPrimaryENImac(), &primaryIP)
err = c.networkClient.SetupHostNetwork(vpcCIDRs, c.awsClient.GetPrimaryENImac(), &primaryIP, c.enablePodENI)
if err != nil {
return errors.Wrap(err, "ipamd init: failed to set up host network")
}

eniMetadata, tagMap, err := c.awsClient.DescribeAllENIs()
eniMetadata, tagMap, trunkENI, err := c.awsClient.DescribeAllENIs()
if err != nil {
return errors.New("ipamd init: failed to retrieve attached ENIs info")
}
@@ -359,7 +380,7 @@ func (c *IPAMContext) nodeInit() error {
retry := 0
for {
retry++
if err = c.setupENI(eni.ENIID, eni); err == nil {
if err = c.setupENI(eni.ENIID, eni, trunkENI); err == nil {
log.Infof("ENI %s set up.", eni.ENIID)
break
}
@@ -389,6 +410,39 @@ func (c *IPAMContext) nodeInit() error {
return err
}

if c.useCustomNetworking && c.eniConfig.Getter().MyENI != "default" {
// Signal to VPC Resource Controller that the node is using custom networking
err := c.SetNodeLabel(vpcENIConfigLabel, c.eniConfig.Getter().MyENI)
if err != nil {
log.Errorf("Failed to set eniConfig node label", err)
podENIErrInc("nodeInit")
return err
}
} else {
// Remove the custom networking label
err := c.SetNodeLabel(vpcENIConfigLabel, "")
if err != nil {
log.Errorf("Failed to delete eniConfig node label", err)
podENIErrInc("nodeInit")
return err
}
}

// If we started on a node with a trunk ENI already attached, add the node label.
if trunkENI != "" {
// Signal to VPC Resource Controller that the node has a trunk already
err := c.SetNodeLabel("vpc.amazonaws.com/has-trunk-attached", "true")
if err != nil {
log.Errorf("Failed to set node label", err)
podENIErrInc("nodeInit")
// If this fails, we probably can't talk to the API server. Let the pod restart
return err
}
} else {
// Check if we want to ask for one
c.askForTrunkENIIfNeeded()
}

// For a new node, attach IPs
increasedPool, err := c.tryAssignIPs()
if err == nil && increasedPool {
@@ -451,6 +505,7 @@ func (c *IPAMContext) StartNodeIPPoolManager() {
}

func (c *IPAMContext) updateIPPoolIfRequired() {
c.askForTrunkENIIfNeeded()
if c.nodeIPPoolTooLow() {
c.increaseIPPool()
} else if c.nodeIPPoolTooHigh() {
@@ -589,13 +644,19 @@ func (c *IPAMContext) increaseIPPool() {
if increasedPool {
c.updateLastNodeIPPoolAction()
} else {
// Check if we need to make room for the VPC Resource Controller to attach a trunk ENI
reserveSlotForTrunkENI := 0
if c.enablePodENI && c.dataStore.GetTrunkENI() == "" {
reserveSlotForTrunkENI = 1
}
// If we did not add an IP, try to add an ENI instead.
if c.dataStore.GetENIs() < (c.maxENI - c.unmanagedENI) {
if c.dataStore.GetENIs() < (c.maxENI - c.unmanagedENI - reserveSlotForTrunkENI) {
if err = c.tryAllocateENI(); err == nil {
c.updateLastNodeIPPoolAction()
}
} else {
log.Debugf("Skipping ENI allocation as the instance's max ENI limit of %d is already reached (accounting for %d unmanaged ENIs)", c.maxENI, c.unmanagedENI)
log.Debugf("Skipping ENI allocation as the max ENI limit of %d is already reached (accounting for %d unmanaged ENIs and %d trunk ENIs)",
c.maxENI, c.unmanagedENI, reserveSlotForTrunkENI)
}
}
}
@@ -653,14 +714,13 @@ func (c *IPAMContext) tryAllocateENI() error {
log.Errorf("Failed to increase pool size: Unable to discover attached ENI from metadata service %v", err)
return err
}

err = c.setupENI(eni, eniMetadata)
err = c.setupENI(eni, eniMetadata, c.dataStore.GetTrunkENI())
if err != nil {
ipamdErrInc("increaseIPPoolsetupENIFailed")
log.Errorf("Failed to increase pool size: %v", err)
return err
}
return nil
return err
}

// For an ENI, try to fill in missing IPs on an existing ENI
@@ -702,9 +762,9 @@ func (c *IPAMContext) tryAssignIPs() (increasedPool bool, err error) {
// 1) add ENI to datastore
// 2) set up linux ENI related networking stack.
// 3) add all ENI's secondary IP addresses to datastore
func (c *IPAMContext) setupENI(eni string, eniMetadata awsutils.ENIMetadata) error {
func (c *IPAMContext) setupENI(eni string, eniMetadata awsutils.ENIMetadata, trunkENI string) error {
// Add the ENI to the datastore
err := c.dataStore.AddENI(eni, eniMetadata.DeviceNumber, eni == c.awsClient.GetPrimaryENI())
err := c.dataStore.AddENI(eni, eniMetadata.DeviceNumber, eni == c.awsClient.GetPrimaryENI(), eni == trunkENI)
if err != nil && err.Error() != datastore.DuplicatedENIError {
return errors.Wrapf(err, "failed to add ENI %s to data store", eni)
}
@@ -812,6 +872,18 @@ func logPoolStats(total, used, maxAddrsPerENI int) {
log.Debugf("IP pool stats: total = %d, used = %d, c.maxIPsPerENI = %d", total, used, maxAddrsPerENI)
}

func (c *IPAMContext) askForTrunkENIIfNeeded() {
if c.enablePodENI && c.dataStore.GetTrunkENI() == "" {
// We need to signal that VPC Resource Controller needs to attach a trunk ENI
err := c.SetNodeLabel("vpc.amazonaws.com/has-trunk-attached", "false")
if err != nil {
podENIErrInc("askForTrunkENIIfNeeded")
log.Errorf("Failed to set node label", err)
}
return
}
}

// nodeIPPoolTooLow returns true if IP pool is below low threshold
func (c *IPAMContext) nodeIPPoolTooLow() bool {
short, _, warmIPTargetDefined := c.ipTargetState()
@@ -864,6 +936,10 @@ func ipamdErrInc(fn string) {
ipamdErr.With(prometheus.Labels{"fn": fn}).Inc()
}

func podENIErrInc(fn string) {
podENIErr.With(prometheus.Labels{"fn": fn}).Inc()
}

// nodeIPPoolReconcile reconcile ENI and IP info from metadata service and IP addresses in datastore
func (c *IPAMContext) nodeIPPoolReconcile(interval time.Duration) {
curTime := time.Now()
@@ -884,8 +960,9 @@ func (c *IPAMContext) nodeIPPoolReconcile(interval time.Duration) {
}
attachedENIs := c.filterUnmanagedENIs(allENIs)
currentENIIPPools := c.dataStore.GetENIInfos().ENIs
trunkENI := c.dataStore.GetTrunkENI()

// Check if a new ENI was added, if so we need to update the tags
// Check if a new ENI was added, if so we need to update the tags.
needToUpdateTags := false
for _, attachedENI := range attachedENIs {
if _, ok := currentENIIPPools[attachedENI.ENIID]; !ok {
@@ -895,11 +972,23 @@ func (c *IPAMContext) nodeIPPoolReconcile(interval time.Duration) {
}
if needToUpdateTags {
log.Debugf("A new ENI added but not by ipamd, updating tags")
allENIs, tagMap, err := c.awsClient.DescribeAllENIs()
allENIs, tagMap, trunk, err := c.awsClient.DescribeAllENIs()
if err != nil {
log.Warnf("Failed to call EC2 to describe ENIs, aborting reconcile: %v", err)
return
}

if c.enablePodENI && trunk != "" {
// Label the node that we have a trunk
err = c.SetNodeLabel("vpc.amazonaws.com/has-trunk-attached", "true")
if err != nil {
podENIErrInc("askForTrunkENIIfNeeded")
log.Errorf("Failed to set node label for trunk. Aborting reconcile", err)
return
}
}
// Update trunk ENI
trunkENI = trunk
c.setUnmanagedENIs(tagMap)
attachedENIs = c.filterUnmanagedENIs(allENIs)
}
@@ -919,7 +1008,7 @@ func (c *IPAMContext) nodeIPPoolReconcile(interval time.Duration) {

// Add new ENI
log.Debugf("Reconcile and add a new ENI %s", attachedENI)
err = c.setupENI(attachedENI.ENIID, attachedENI)
err = c.setupENI(attachedENI.ENIID, attachedENI, trunkENI)
if err != nil {
log.Errorf("IP pool reconcile: Failed to set up ENI %s network: %v", attachedENI.ENIID, err)
ipamdErrInc("eniReconcileAdd")
@@ -1079,7 +1168,11 @@ func getMinimumIPTarget() int {
}

func disablingENIProvisioning() bool {
return getEnvBoolWithDefault(envDisableENIProvisioning, noDisableENIProvisioning)
return getEnvBoolWithDefault(envDisableENIProvisioning, false)
}

func enablePodENI() bool {
return getEnvBoolWithDefault(envEnablePodENI, false)
}

// filterUnmanagedENIs filters out ENIs marked with the "node.k8s.amazonaws.com/no_manage" tag
@@ -1158,3 +1251,60 @@ func min(x, y int) int {
}
return x
}

func (c *IPAMContext) getTrunkLinkIndex() (int, error) {
trunkENI := c.dataStore.GetTrunkENI()
attachedENIs, err := c.awsClient.GetAttachedENIs()
if err != nil {
return -1, err
}
for _, eni := range attachedENIs {
if eni.ENIID == trunkENI {
retryLinkByMacInterval := 100 * time.Millisecond
link, err := c.networkClient.GetLinkByMac(eni.MAC, retryLinkByMacInterval)
if err != nil {
return -1, err
}
return link.Attrs().Index, nil

}
}
return -1, errors.New("No trunk!")
}

func (c *IPAMContext) SetNodeLabel(key, value string) error {
// Find my node
node, err := c.k8sClient.CoreV1().Nodes().Get(c.myNodeName, metav1.GetOptions{})
if err != nil {
log.Errorf("Failed to get node: %v", err)
return err
}

if labelValue, ok := node.Labels[key]; ok && labelValue == value {
log.Debugf("Node label %q is already %q", key, labelValue)
return nil
}
// Make deep copy for modification
updateNode := node.DeepCopy()

// Set node label
if value != "" {
updateNode.Labels[key] = value
} else {
// Empty value, delete the label
log.Debugf("Deleting label %q", key)
delete(updateNode.Labels, key)
}

// Update node status to advertise the resource.
_, err = c.k8sClient.CoreV1().Nodes().Update(updateNode)
if err != nil {
log.Errorf("Failed to update node %s with label %q: %q, error: %v", c.myNodeName, key, value, err)
}
log.Infof("Updated node %s with label %q: %q", c.myNodeName, key, value)
return nil
}

func (c *IPAMContext) GetPod(podName, namespace string) (*v1.Pod, error) {
return c.k8sClient.CoreV1().Pods(namespace).Get(podName, metav1.GetOptions{})
}
38 changes: 33 additions & 5 deletions pkg/ipamd/ipamd_test.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,9 @@ import (
"reflect"
"testing"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/aws/amazon-vpc-cni-k8s/pkg/apis/crd/v1alpha1"
"github.com/aws/amazon-vpc-cni-k8s/pkg/awsutils"
mock_awsutils "github.com/aws/amazon-vpc-cni-k8s/pkg/awsutils/mocks"
@@ -49,6 +52,7 @@ const (
ipaddr11 = "10.10.20.11"
ipaddr12 = "10.10.20.12"
vpcCIDR = "10.10.0.0/16"
myNodeName = "testNodeName"
)

type testMocks struct {
@@ -83,6 +87,7 @@ func TestNodeInit(t *testing.T) {

mockContext := &IPAMContext{
awsClient: m.awsutils,
k8sClient: m.clientset,
maxIPsPerENI: 14,
maxENI: 4,
warmENITarget: 1,
@@ -91,6 +96,7 @@ func TestNodeInit(t *testing.T) {
terminating: int32(0),
networkClient: m.network,
dataStore: datastore.NewDataStore(log, datastore.NewTestCheckpoint(fakeCheckpoint)),
myNodeName: myNodeName,
}

eni1, eni2 := getDummyENIMetadata()
@@ -104,12 +110,12 @@ func TestNodeInit(t *testing.T) {
primaryIP := net.ParseIP(ipaddr01)
m.awsutils.EXPECT().GetVPCIPv4CIDRs().AnyTimes().Return(cidrs)
m.awsutils.EXPECT().GetPrimaryENImac().Return("")
m.network.EXPECT().SetupHostNetwork(cidrs, "", &primaryIP).Return(nil)
m.network.EXPECT().SetupHostNetwork(cidrs, "", &primaryIP, false).Return(nil)

m.awsutils.EXPECT().GetPrimaryENI().AnyTimes().Return(primaryENIid)

eniMetadataSlice := []awsutils.ENIMetadata{eni1, eni2}
m.awsutils.EXPECT().DescribeAllENIs().Return(eniMetadataSlice, map[string]awsutils.TagMap{}, nil)
m.awsutils.EXPECT().DescribeAllENIs().Return(eniMetadataSlice, map[string]awsutils.TagMap{}, "", nil)
m.network.EXPECT().SetupENINetwork(gomock.Any(), secMAC, secDevice, secSubnet)

m.awsutils.EXPECT().GetLocalIPv4().Return(ipaddr01)
@@ -119,6 +125,15 @@ func TestNodeInit(t *testing.T) {

m.network.EXPECT().UseExternalSNAT().Return(false)
m.network.EXPECT().UpdateRuleListBySrc(gomock.Any(), gomock.Any(), gomock.Any(), true)

fakeNode := v1.Node{
TypeMeta: metav1.TypeMeta{Kind: "Node"},
ObjectMeta: metav1.ObjectMeta{Name: myNodeName},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{},
}
_, _ = m.clientset.CoreV1().Nodes().Create(&fakeNode)

// Add IPs
m.awsutils.EXPECT().AllocIPAddresses(gomock.Any(), gomock.Any())

@@ -360,7 +375,7 @@ func TestNodeIPPoolReconcile(t *testing.T) {
}
m.awsutils.EXPECT().GetAttachedENIs().Return(eniMetadata, nil)
m.awsutils.EXPECT().GetPrimaryENI().Times(2).Return(primaryENIid)
m.awsutils.EXPECT().DescribeAllENIs().Return(eniMetadata, map[string]awsutils.TagMap{}, nil)
m.awsutils.EXPECT().DescribeAllENIs().Return(eniMetadata, map[string]awsutils.TagMap{}, "", nil)

mockContext.nodeIPPoolReconcile(0)

@@ -437,7 +452,7 @@ func TestGetWarmIPTargetState(t *testing.T) {
assert.Equal(t, 0, over)

// add 2 addresses to datastore
_ = mockContext.dataStore.AddENI("eni-1", 1, true)
_ = mockContext.dataStore.AddENI("eni-1", 1, true, false)
_ = mockContext.dataStore.AddIPv4AddressToStore("eni-1", "1.1.1.1")
_ = mockContext.dataStore.AddIPv4AddressToStore("eni-1", "1.1.1.2")

@@ -508,7 +523,7 @@ func testDatastore() *datastore.DataStore {

func datastoreWith3FreeIPs() *datastore.DataStore {
datastoreWith3FreeIPs := testDatastore()
_ = datastoreWith3FreeIPs.AddENI(primaryENIid, 1, true)
_ = datastoreWith3FreeIPs.AddENI(primaryENIid, 1, true, false)
_ = datastoreWith3FreeIPs.AddIPv4AddressToStore(primaryENIid, ipaddr01)
_ = datastoreWith3FreeIPs.AddIPv4AddressToStore(primaryENIid, ipaddr02)
_ = datastoreWith3FreeIPs.AddIPv4AddressToStore(primaryENIid, ipaddr03)
@@ -585,3 +600,16 @@ func TestDisablingENIProvisioning(t *testing.T) {
disabled = disablingENIProvisioning()
assert.False(t, disabled)
}

func TestPodENIConfigFlag(t *testing.T) {
m := setup(t)
defer m.ctrl.Finish()

_ = os.Setenv(envEnablePodENI, "true")
disabled := enablePodENI()
assert.True(t, disabled)

_ = os.Unsetenv(envEnablePodENI)
disabled = enablePodENI()
assert.False(t, disabled)
}
117 changes: 109 additions & 8 deletions pkg/ipamd/rpc_handler.go
Original file line number Diff line number Diff line change
@@ -14,9 +14,11 @@
package ipamd

import (
"encoding/json"
"net"
"os"
"os/signal"
"strings"
"syscall"

"github.com/pkg/errors"
@@ -28,7 +30,9 @@ import (
"google.golang.org/grpc/reflection"

"github.com/aws/amazon-vpc-cni-k8s/pkg/ipamd/datastore"
"github.com/aws/amazon-vpc-cni-k8s/pkg/networkutils"
"github.com/aws/amazon-vpc-cni-k8s/rpc"
k8serror "k8s.io/apimachinery/pkg/api/errors"
)

const (
@@ -41,18 +45,88 @@ type server struct {
ipamContext *IPAMContext
}

// PodENIData is used to parse the list of ENIs in the branch ENI pod annotation
type PodENIData struct {
ENIID string `json:"eniId"`
IfAddress string `json:"ifAddress"`
PrivateIP string `json:"privateIp"`
VlanID int `json:"vlanId"`
SubnetCIDR string `json:"subnetCidr"`
}

// AddNetwork processes CNI add network request and return an IP address for container
func (s *server) AddNetwork(ctx context.Context, in *rpc.AddNetworkRequest) (*rpc.AddNetworkReply, error) {
log.Infof("Received AddNetwork for NS %s, Sandbox %s, ifname %s",
in.Netns, in.ContainerID, in.IfName)
addIPCnt.Inc()

ipamKey := datastore.IPAMKey{
ContainerID: in.ContainerID,
IfName: in.IfName,
NetworkName: in.NetworkName,
failureResponse := rpc.AddNetworkReply{Success: false}
var deviceNumber, vlanId, trunkENILinkIndex int
var addr, branchENIMAC, podENISubnetGW string
var err error
if s.ipamContext.enablePodENI {
// Check pod spec for Branch ENI
pod, err := s.ipamContext.GetPod(in.K8S_POD_NAME, in.K8S_POD_NAMESPACE)
if err != nil {
log.Warnf("Send AddNetworkReply: Failed to get pod: %v", err)
return &failureResponse, nil
}
limits := pod.Spec.Containers[0].Resources.Limits
for resName := range limits {
if strings.HasPrefix(string(resName), "vpc.amazonaws.com/pod-eni") {
// Check that we have a trunk
trunkENI := s.ipamContext.dataStore.GetTrunkENI()
if trunkENI == "" {
log.Warn("Send AddNetworkReply: No trunk ENI found, cannot add a pod ENI")
return &failureResponse, nil
} else {
trunkENILinkIndex, err = s.ipamContext.getTrunkLinkIndex()
if err != nil {
log.Warn("Send AddNetworkReply: No trunk ENI Link Index found, cannot add a pod ENI")
return &failureResponse, nil
}
}
val, branch := pod.Annotations["vpc.amazonaws.com/pod-eni"]
if branch {
// Parse JSON data
var podENIData []PodENIData
err := json.Unmarshal([]byte(val), &podENIData)
if err != nil || len(podENIData) < 1 {
log.Errorf("Failed to unmarshal PodENIData JSON: %v", err)
return &failureResponse, nil
}
firstENI := podENIData[0]
addr = firstENI.PrivateIP
branchENIMAC = firstENI.IfAddress
vlanId = firstENI.VlanID
if addr == "" || branchENIMAC == "" || vlanId == 0 {
log.Errorf("Failed to parse pod-ENI annotation: %s", val)
return &failureResponse, nil
}
currentGW := strings.Split(firstENI.SubnetCIDR, "/")[0]
// Increment value CIDR value
nextGWIP, err := networkutils.IncrementIPv4Addr(net.ParseIP(currentGW))
if err != nil {
log.Errorf("Unable to get next Gateway IP for branch ENI from %s: %v", currentGW, err)
return &failureResponse, nil
}
podENISubnetGW = nextGWIP.String()
deviceNumber = -1 // Not needed for branch ENI, they depend on trunkENIDeviceIndex
} else {
log.Infof("Send AddNetworkReply: failed to get Branch ENI resource")
return &failureResponse, nil
}
}
}
}
if addr == "" {
ipamKey := datastore.IPAMKey{
ContainerID: in.ContainerID,
IfName: in.IfName,
NetworkName: in.NetworkName,
}
addr, deviceNumber, err = s.ipamContext.dataStore.AssignPodIPv4Address(ipamKey)
}
addr, deviceNumber, err := s.ipamContext.dataStore.AssignPodIPv4Address(ipamKey)

pbVPCcidrs := s.ipamContext.awsClient.GetVPCIPv4CIDRs()
for _, cidr := range pbVPCcidrs {
log.Debugf("VPC CIDR %s", cidr)
@@ -72,17 +146,44 @@ func (s *server) AddNetwork(ctx context.Context, in *rpc.AddNetworkRequest) (*rp
DeviceNumber: int32(deviceNumber),
UseExternalSNAT: useExternalSNAT,
VPCcidrs: pbVPCcidrs,
PodVlanId: int32(vlanId),
PodENIMAC: branchENIMAC,
PodENISubnetGW: podENISubnetGW,
ParentIfIndex: int32(trunkENILinkIndex),
}

log.Infof("Send AddNetworkReply: IPv4Addr %s, DeviceNumber: %d, err: %v", addr, deviceNumber, err)
addIPCnt.Inc()
return &resp, nil
}

func (s *server) DelNetwork(ctx context.Context, in *rpc.DelNetworkRequest) (*rpc.DelNetworkReply, error) {
log.Infof("Received DelNetwork for Sandbox %s", in.ContainerID)
delIPCnt.With(prometheus.Labels{"reason": in.Reason}).Inc()

if s.ipamContext.enablePodENI {
pod, err := s.ipamContext.GetPod(in.K8S_POD_NAME, in.K8S_POD_NAMESPACE)
if err != nil {
if k8serror.IsNotFound(err) {
log.Warn("Send AddNetworkReply: pod not found")
return &rpc.DelNetworkReply{Success: true}, nil
}
log.Warnf("Send DelNetworkReply: Failed to get pod spec: %v", err)
return &rpc.DelNetworkReply{Success: false}, err
} else {
val, branch := pod.Annotations["vpc.amazonaws.com/pod-eni"]
if branch {
// Parse JSON data
var podENIData []PodENIData
err := json.Unmarshal([]byte(val), &podENIData)
if err != nil || len(podENIData) < 1 {
log.Errorf("Failed to unmarshal PodENIData JSON: %v", err)
}
return &rpc.DelNetworkReply{
Success: true,
PodVlanId: int32(podENIData[0].VlanID),
IPv4Addr: podENIData[0].PrivateIP}, err
}
}
}
ipamKey := datastore.IPAMKey{
ContainerID: in.ContainerID,
IfName: in.IfName,
46 changes: 46 additions & 0 deletions pkg/k8sapi/discovery.go
Original file line number Diff line number Diff line change
@@ -34,6 +34,12 @@ const (
cniPodName = "aws-node"
)

// K8SAPIs defines interface to use kubelet introspection API
type K8SAPIs interface {
SetNodeLabel(key, value string) error
GetPod(podName, namespace string) (*v1.Pod, error)
}

// K8SPodInfo provides pod info
type K8SPodInfo struct {
// Name is pod's name
@@ -105,6 +111,9 @@ func (d *Controller) GetCNIPods() []string {
return cniPods
}

// TODO: Add DiscoverNodeLabels() watcher function.
// TODO: Or is the right solution to look for a ConfigMap?

// DiscoverCNIK8SPods discovers CNI pods, aws-node, running in the cluster
func (d *Controller) DiscoverCNIK8SPods() {
// create the pod watcher
@@ -154,6 +163,43 @@ func (d *Controller) DiscoverK8SPods(podListWatcher *cache.ListWatch) {
select {}
}

func (d *Controller) SetNodeLabel(key, value string) error {
// Find my node
node, err := d.kubeClient.CoreV1().Nodes().Get(d.myNodeName, metav1.GetOptions{})
if err != nil {
log.Errorf("Failed to get node: %v", err)
return err
}

if labelValue, ok := node.Labels[key]; ok && labelValue == value {
log.Debugf("Node label %q is already %q", key, labelValue)
return nil
}
// Make deep copy for modification
updateNode := node.DeepCopy()

// Set node label
if value != "" {
updateNode.Labels[key] = value
} else {
// Empty value, delete the label
log.Debugf("Deleting label %q", key)
delete(updateNode.Labels, key)
}

// Update node status to advertise the resource.
_, err = d.kubeClient.CoreV1().Nodes().Update(updateNode)
if err != nil {
log.Errorf("Failed to update node %s with label %q: %q, error: %v", d.myNodeName, key, value, err)
}
log.Infof("Updated node %s with label %q: %q", d.myNodeName, key, value)
return nil
}

func (d *Controller) GetPod(podName, namespace string) (*v1.Pod, error) {
return d.kubeClient.CoreV1().Pods(namespace).Get(podName, metav1.GetOptions{})
}

// The rest of logic/code are taken from kubernetes/client-go/examples/workqueue
func newController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *controller {
return &controller{
94 changes: 94 additions & 0 deletions pkg/k8sapi/mocks/k8sapi_mocks.go
24 changes: 20 additions & 4 deletions pkg/networkutils/mocks/network_mocks.go
69 changes: 59 additions & 10 deletions pkg/networkutils/network.go
Original file line number Diff line number Diff line change
@@ -42,6 +42,9 @@ import (
)

const (
// Local rule, needs to come after the pod ENI rules
localRulePriority = 20

// 513 - 1023, can be used priority lower than toPodRulePriority but higher than default nonVPC CIDR rule

// 1024 is reserved for (ip rule not to <VPC's subnet> table main)
@@ -50,8 +53,12 @@ const (
// 1025 - 1535 can be used priority lower than fromPodRulePriority but higher than default nonVPC CIDR rule
fromPodRulePriority = 1536

// Main route table
mainRoutingTable = unix.RT_TABLE_MAIN

// Local route table
localRouteTable = unix.RT_TABLE_LOCAL

// This environment is used to specify whether an external NAT gateway will be used to provide SNAT of
// secondary ENI IP addresses. If set to "true", the SNAT iptables rule and off-VPC ip rule will not
// be installed and will be removed if they are already installed. Defaults to false.
@@ -113,7 +120,7 @@ var log = logger.Get()
// NetworkAPIs defines the host level and the ENI level network related operations
type NetworkAPIs interface {
// SetupNodeNetwork performs node level network configuration
SetupHostNetwork(vpcCIDRs []string, primaryMAC string, primaryAddr *net.IP) error
SetupHostNetwork(vpcCIDRs []string, primaryMAC string, primaryAddr *net.IP, enablePodENI bool) error
// SetupENINetwork performs eni level network configuration
SetupENINetwork(eniIP string, mac string, table int, subnetCIDR string) error
UseExternalSNAT() bool
@@ -122,6 +129,7 @@ type NetworkAPIs interface {
GetRuleListBySrc(ruleList []netlink.Rule, src net.IPNet) ([]netlink.Rule, error)
UpdateRuleListBySrc(ruleList []netlink.Rule, src net.IPNet, toCIDRs []string, toFlag bool) error
DeleteRuleListBySrc(src net.IPNet) error
GetLinkByMac(mac string, retryInterval time.Duration) (netlink.Link, error)
}

type linuxNetwork struct {
@@ -205,7 +213,7 @@ func findPrimaryInterfaceName(primaryMAC string) (string, error) {
}

// SetupHostNetwork performs node level network configuration
func (n *linuxNetwork) SetupHostNetwork(vpcCIDRs []string, primaryMAC string, primaryAddr *net.IP) error {
func (n *linuxNetwork) SetupHostNetwork(vpcCIDRs []string, primaryMAC string, primaryAddr *net.IP, enablePodENI bool) error {
log.Info("Setting up host network... ")

var err error
@@ -238,7 +246,7 @@ func (n *linuxNetwork) SetupHostNetwork(vpcCIDRs []string, primaryMAC string, pr
}
}

link, err := LinkByMac(primaryMAC, n.netLink, retryLinkByMacInterval)
link, err := linkByMac(primaryMAC, n.netLink, retryLinkByMacInterval)
if err != nil {
return errors.Wrapf(err, "setupHostNetwork: failed to find the link primary ENI with MAC address %s", primaryMAC)
}
@@ -271,6 +279,25 @@ func (n *linuxNetwork) SetupHostNetwork(vpcCIDRs []string, primaryMAC string, pr
}
}

// If we want per pod ENIs, we need to give pod ENIs veth bridges a lower priority that the local table,
// or the rp_filter check will fail.
if enablePodENI {
localRule := n.netLink.NewRule()
localRule.Table = localRouteTable
localRule.Priority = localRulePriority
// Add new rule with higher priority
err := n.netLink.RuleAdd(localRule)
if err != nil && !isRuleExistsError(err) {
return errors.Wrap(err, "ChangeLocalRulePriority: unable to update local rule priority")
}
// Delete the priority 0 rule
localRule.Priority = 0
err = n.netLink.RuleDel(localRule)
if err != nil && !containsNoSuchRule(err) {
return errors.Wrap(err, "ChangeLocalRulePriority: failed to delete priority 0 local rule")
}
}

ipt, err := n.newIptables()
if err != nil {
return errors.Wrap(err, "host network setup: failed to create iptables")
@@ -420,6 +447,17 @@ func (n *linuxNetwork) SetupHostNetwork(vpcCIDRs []string, primaryMAC string, pr
},
})

iptableRules = append(iptableRules, iptablesRule{
name: "connmark restore for primary ENI from vlan",
shouldExist: n.nodePortSupportEnabled,
table: "mangle",
chain: "PREROUTING",
rule: []string{
"-m", "comment", "--comment", "AWS, primary ENI",
"-i", "vlan+", "-j", "CONNMARK", "--restore-mark", "--mask", fmt.Sprintf("%#x", n.mainENIMark),
},
})

for _, rule := range iptableRules {
log.Debugf("execute iptable rule : %s", rule.name)

@@ -504,6 +542,13 @@ func containsNoSuchRule(err error) bool {
return false
}

func isRuleExistsError(err error) bool {
if errno, ok := err.(syscall.Errno); ok {
return errno == syscall.EEXIST
}
return false
}

// GetConfigForDebug returns the active values of the configuration env vars (for debugging purposes).
func GetConfigForDebug() map[string]interface{} {
return map[string]interface{}{
@@ -615,8 +660,13 @@ func getConnmark() uint32 {
return defaultConnmark
}

// LinkByMac returns linux netlink based on interface MAC
func LinkByMac(mac string, netLink netlinkwrapper.NetLink, retryInterval time.Duration) (netlink.Link, error) {
// GetLinkByMac returns linux netlink based on interface MAC
func (n *linuxNetwork) GetLinkByMac(mac string, retryInterval time.Duration) (netlink.Link, error) {
return linkByMac(mac, n.netLink, retryInterval)
}

// linkByMac returns linux netlink based on interface MAC
func linkByMac(mac string, netLink netlinkwrapper.NetLink, retryInterval time.Duration) (netlink.Link, error) {
// The adapter might not be immediately available, so we perform retries
var lastErr error
attempt := 0
@@ -629,7 +679,6 @@ func LinkByMac(mac string, netLink netlinkwrapper.NetLink, retryInterval time.Du
}

links, err := netLink.LinkList()

if err != nil {
lastErr = errors.Errorf("%s (attempt %d/%d)", err, attempt, maxAttemptsLinkByMac)
log.Debugf(lastErr.Error())
@@ -664,7 +713,7 @@ func setupENINetwork(eniIP string, eniMAC string, eniTable int, eniSubnetCIDR st

log.Infof("Setting up network for an ENI with IP address %s, MAC address %s, CIDR %s and route table %d",
eniIP, eniMAC, eniSubnetCIDR, eniTable)
link, err := LinkByMac(eniMAC, netLink, retryLinkByMacInterval)
link, err := linkByMac(eniMAC, netLink, retryLinkByMacInterval)
if err != nil {
return errors.Wrapf(err, "setupENINetwork: failed to find the link which uses MAC address %s", eniMAC)
}
@@ -684,7 +733,7 @@ func setupENINetwork(eniIP string, eniMAC string, eniTable int, eniSubnetCIDR st
return errors.Wrapf(err, "setupENINetwork: invalid IPv4 CIDR block %s", eniSubnetCIDR)
}

gw, err := incrementIPv4Addr(ipnet.IP)
gw, err := IncrementIPv4Addr(ipnet.IP)
if err != nil {
return errors.Wrapf(err, "setupENINetwork: failed to define gateway address from %v", ipnet.IP)
}
@@ -774,8 +823,8 @@ func setupENINetwork(eniIP string, eniMAC string, eniTable int, eniSubnetCIDR st
return nil
}

// incrementIPv4Addr returns incremented IPv4 address
func incrementIPv4Addr(ip net.IP) (net.IP, error) {
// IncrementIPv4Addr returns incremented IPv4 address
func IncrementIPv4Addr(ip net.IP) (net.IP, error) {
ip4 := ip.To4()
if ip4 == nil {
return nil, fmt.Errorf("%q is not a valid IPv4 Address", ip)
56 changes: 46 additions & 10 deletions pkg/networkutils/network_test.go
Original file line number Diff line number Diff line change
@@ -86,10 +86,10 @@ func TestSetupENINetwork(t *testing.T) {
eth1 := mock_netlink.NewMockLink(ctrl)
// Emulate a delay attaching the ENI so a retry is necessary
// First attempt gets one links
firstlistSet := mockNetLink.EXPECT().LinkList().Return([]netlink.Link([]netlink.Link{lo}), nil)
firstlistSet := mockNetLink.EXPECT().LinkList().Return([]netlink.Link{lo}, nil)
lo.EXPECT().Attrs().Return(mockLinkAttrs1)
// Second attempt gets both links
secondlistSet := mockNetLink.EXPECT().LinkList().Return([]netlink.Link([]netlink.Link{lo, eth1}), nil)
secondlistSet := mockNetLink.EXPECT().LinkList().Return([]netlink.Link{lo, eth1}, nil)
lo.EXPECT().Attrs().Return(mockLinkAttrs1)
eth1.EXPECT().Attrs().Return(mockLinkAttrs2)
gomock.InOrder(firstlistSet, secondlistSet)
@@ -161,7 +161,7 @@ func TestSetupHostNetworkNodePortDisabled(t *testing.T) {
mockNetLink.EXPECT().RuleDel(&mainENIRule)

var vpcCIDRs []string
err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP)
err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP, false)
assert.NoError(t, err)
}

@@ -287,7 +287,7 @@ func TestSetupHostNetworkNodePortEnabled(t *testing.T) {

var vpcCIDRs []string

err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP)
err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP, false)
assert.NoError(t, err)

assert.Equal(t, map[string]map[string][][]string{
@@ -360,7 +360,7 @@ func TestSetupHostNetworkWithExcludeSNATCIDRs(t *testing.T) {
mockProcSys.EXPECT().Set("net/ipv4/conf/lo/rp_filter", "2").Return(nil)

vpcCIDRs := []string{"10.10.0.0/16", "10.11.0.0/16"}
err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP)
err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP, false)
assert.NoError(t, err)
assert.Equal(t,
map[string]map[string][][]string{
@@ -413,7 +413,7 @@ func TestSetupHostNetworkCleansUpStaleSNATRules(t *testing.T) {
_ = mockIptables.Append("nat", "POSTROUTING", "-m", "comment", "--comment", "AWS SNAT CHAIN", "-j", "AWS-SNAT-CHAIN-0")

vpcCIDRs := []string{"10.10.0.0/16", "10.11.0.0/16"}
err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP)
err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP, false)
assert.NoError(t, err)

assert.Equal(t,
@@ -467,7 +467,7 @@ func TestSetupHostNetworkExcludedSNATCIDRsIdempotent(t *testing.T) {

// remove exclusions
vpcCIDRs := []string{"10.10.0.0/16", "10.11.0.0/16"}
err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP)
err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP, false)
assert.NoError(t, err)

assert.Equal(t,
@@ -512,7 +512,7 @@ func TestSetupHostNetworkMultipleCIDRs(t *testing.T) {
mockProcSys.EXPECT().Set("net/ipv4/conf/lo/rp_filter", "2").Return(nil)

vpcCIDRs := []string{"10.10.0.0/16", "10.11.0.0/16"}
err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP)
err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP, false)
assert.NoError(t, err)
}

@@ -531,7 +531,7 @@ func TestIncrementIPv4Addr(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result, err := incrementIPv4Addr(tc.ip)
result, err := IncrementIPv4Addr(tc.ip)
if tc.err {
assert.Error(t, err)
} else {
@@ -563,7 +563,36 @@ func TestSetupHostNetworkIgnoringRpFilterUpdate(t *testing.T) {
setupNetLinkMocks(ctrl, mockNetLink)

var vpcCIDRs []string
err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP)
err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP, false)
assert.NoError(t, err)
}

// TODO: fix test
func TestSetupHostNetworkUpdateLocalRule(t *testing.T) {
ctrl, mockNetLink, _, mockNS, mockIptables, mockProcSys := setup(t)
defer ctrl.Finish()

ln := &linuxNetwork{
useExternalSNAT: true,
nodePortSupportEnabled: true,
shouldConfigureRpFilter: false,
mainENIMark: defaultConnmark,
mtu: testMTU,

netLink: mockNetLink,
ns: mockNS,
newIptables: func() (iptablesIface, error) {
return mockIptables, nil
},
procSys: mockProcSys,
}
setupNetLinkMocks(ctrl, mockNetLink)
setupVethNetLinkMocks(mockNetLink)

mockNetLink.EXPECT()

var vpcCIDRs []string
err := ln.SetupHostNetwork(vpcCIDRs, loopback, &testENINetIP, true)
assert.NoError(t, err)
}

@@ -577,6 +606,13 @@ func setupNetLinkMocks(ctrl *gomock.Controller, mockNetLink *mock_netlinkwrapper
mockNetLink.EXPECT().RuleAdd(&mainENIRule)
}

func setupVethNetLinkMocks(mockNetLink *mock_netlinkwrapper.MockNetLink) {
var localRule netlink.Rule
mockNetLink.EXPECT().NewRule().Return(&localRule)
mockNetLink.EXPECT().RuleAdd(&localRule)
mockNetLink.EXPECT().RuleDel(&localRule)
}

type mockIptables struct {
// dataplaneState is a map from table name to chain name to slice of rulespecs
dataplaneState map[string]map[string][][]string

0 comments on commit 9073605

Please sign in to comment.