Skip to content

Commit

Permalink
Refine ENI Tagging logic (#1482) (#1496)
Browse files Browse the repository at this point in the history
1. All ENIs created by IPAM-D will be tagged with all desired tags on creation.
2. All ENIs managed by IPAM-D will be tagged with all desired tags if not already tagged with these tags. Other tags on ENI will be kept as it is.
   1. Trunk ENI is excluded, as it's lifecycle is managed by vpc-resource-controller.
   2. Since we tag ENIs on ENI creation, this backfill logic will only trigger for below cases:
       * ENIs created by previous versions of ENI and the desired tag set changed.
       * ENIs attached to instances from external source without the node.k8s.amazonaws.com/no_manage tag.
3. The desired tag set is:
   * node.k8s.amazonaws.com/instance_id: <instance_id>
   * cluster.k8s.amazonaws.com/name: <cluster-name> if CLUSTER_NAME envVar is specified.
   * additional tags specified if ADDITIONAL_ENI_TAGS envVar is specified.
  • Loading branch information
M00nF1sh authored Jun 7, 2021
1 parent c605851 commit 1390229
Show file tree
Hide file tree
Showing 5 changed files with 991 additions and 287 deletions.
243 changes: 128 additions & 115 deletions pkg/awsutils/awsutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ type APIs interface {
// FreeENI detaches ENI interface and deletes it
FreeENI(eniName string) error

// TagENI Tags ENI with current tags to contain expected tags.
TagENI(eniID string, currentTags map[string]string) error

// GetAttachedENIs retrieves eni information from instance metadata service
GetAttachedENIs() (eniList []ENIMetadata, err error)

Expand Down Expand Up @@ -169,19 +172,23 @@ type APIs interface {
// EC2InstanceMetadataCache caches instance metadata
type EC2InstanceMetadataCache struct {
// metadata info
securityGroups StringSet
subnetID string
localIPv4 net.IP
instanceID string
instanceType string
primaryENI string
primaryENImac string
availabilityZone string
region string
securityGroups StringSet
subnetID string
localIPv4 net.IP
instanceID string
instanceType string
primaryENI string
primaryENImac string
availabilityZone string
region string

unmanagedENIs StringSet
useCustomNetworking bool
cniunmanagedENIs StringSet

clusterName string
additionalENITags map[string]string

imds TypedIMDS
ec2SVC ec2wrapper.EC2
}
Expand Down Expand Up @@ -328,6 +335,8 @@ func New(useCustomNetworking bool) (*EC2InstanceMetadataCache, error) {

cache := &EC2InstanceMetadataCache{}
cache.imds = TypedIMDS{instrumentedIMDS{ec2Metadata}}
cache.clusterName = os.Getenv(clusterNameEnvVar)
cache.additionalENITags = loadAdditionalENITags()

region, err := ec2Metadata.Region()
if err != nil {
Expand Down Expand Up @@ -627,9 +636,6 @@ func (cache *EC2InstanceMetadataCache) AllocENI(useCustomCfg bool, sg []*string,
return "", errors.Wrap(err, "AllocENI: error attaching ENI")
}

// Once the ENI is attached, tag it.
cache.tagENI(eniID, maxENIBackoffDelay)

// Also change the ENI's attribute so that the ENI will be deleted when the instance is deleted.
attributeInput := &ec2.ModifyNetworkInterfaceAttributeInput{
Attachment: &ec2.NetworkInterfaceAttachmentChanges{
Expand Down Expand Up @@ -682,25 +688,24 @@ func (cache *EC2InstanceMetadataCache) attachENI(eniID string) (string, error) {
// return ENI id, error
func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string, subnet string) (string, error) {
eniDescription := eniDescriptionPrefix + cache.instanceID
//Tag on create with create TS
tags := []*ec2.Tag{
{
Key: aws.String(eniCreatedAtTagKey),
Value: aws.String(time.Now().Format(time.RFC3339)),
},
tags := map[string]string{
eniCreatedAtTagKey: time.Now().Format(time.RFC3339),
}
resourceType := "network-interface"
tagspec := []*ec2.TagSpecification{
for key, value := range cache.buildENITags() {
tags[key] = value
}
tagSpec := []*ec2.TagSpecification{
{
ResourceType: &resourceType,
Tags: tags,
ResourceType: aws.String(ec2.ResourceTypeNetworkInterface),
Tags: convertTagsToSDKTags(tags),
},
}

input := &ec2.CreateNetworkInterfaceInput{
Description: aws.String(eniDescription),
Groups: aws.StringSlice(cache.securityGroups.SortedList()),
SubnetId: aws.String(cache.subnetID),
TagSpecifications: tagspec,
TagSpecifications: tagSpec,
}

if useCustomCfg {
Expand All @@ -715,11 +720,8 @@ func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string
log.Info("Using same config as the primary interface for the new ENI")
}

var sgs []string
for i := range input.Groups {
sgs = append(sgs, *input.Groups[i])
}
log.Infof("Creating ENI with security groups: %v in subnet: %s", sgs, *input.SubnetId)
log.Infof("Creating ENI with security groups: %v in subnet: %s", aws.StringValueSlice(input.Groups), aws.StringValue(input.SubnetId))

start := time.Now()
result, err := cache.ec2SVC.CreateNetworkInterfaceWithContext(context.Background(), input)
awsAPILatency.WithLabelValues("CreateNetworkInterface", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start))
Expand All @@ -732,47 +734,43 @@ func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string
return aws.StringValue(result.NetworkInterface.NetworkInterfaceId), nil
}

func (cache *EC2InstanceMetadataCache) tagENI(eniID string, maxBackoffDelay time.Duration) {
// Tag the ENI with "node.k8s.amazonaws.com/instance_id=<instance_id>"
tags := []*ec2.Tag{
{
Key: aws.String(eniNodeTagKey),
Value: aws.String(cache.instanceID),
},
// buildENITags computes the desired AWS Tags for eni
func (cache *EC2InstanceMetadataCache) buildENITags() map[string]string {
tags := map[string]string{
eniNodeTagKey: cache.instanceID,
}

// If the CLUSTER_NAME env var is present,
// If clusterName is provided,
// tag the ENI with "cluster.k8s.amazonaws.com/name=<cluster_name>"
clusterName := os.Getenv(clusterNameEnvVar)
if clusterName != "" {
tags = append(tags, &ec2.Tag{
Key: aws.String(eniClusterTagKey),
Value: aws.String(clusterName),
})
if cache.clusterName != "" {
tags[eniClusterTagKey] = cache.clusterName
}
for key, value := range cache.additionalENITags {
tags[key] = value
}
return tags
}

//additionalEniTags for adding additional tags on ENI
additionalEniTags := os.Getenv(additionalEniTagsEnvVar)
if additionalEniTags != "" {
tagsMap, err := parseAdditionalEniTagsMap(additionalEniTags)
if err != nil {
log.Warnf("Failed to add additional tags to the newly created ENI %s: %v", eniID, err)
func (cache *EC2InstanceMetadataCache) TagENI(eniID string, currentTags map[string]string) error {
tagChanges := make(map[string]string)
for tagKey, tagValue := range cache.buildENITags() {
if currentTagValue, ok := currentTags[tagKey]; !ok || currentTagValue != tagValue {
tagChanges[tagKey] = tagValue
}
tags = mapToTags(tagsMap, tags)
}

for _, tag := range tags {
log.Debugf("Trying to tag newly created ENI: key=%s, value=%s", aws.StringValue(tag.Key), aws.StringValue(tag.Value))
if len(tagChanges) == 0 {
return nil
}

input := &ec2.CreateTagsInput{
Resources: []*string{
aws.String(eniID),
},
Tags: tags,
Tags: convertTagsToSDKTags(tagChanges),
}

_ = retry.NWithBackoff(retry.NewSimpleBackoff(500*time.Millisecond, maxBackoffDelay, 0.3, 2), 5, func() error {
log.Debugf("Tagging ENI %s with missing tags: %v", eniID, tagChanges)
return retry.NWithBackoff(retry.NewSimpleBackoff(500*time.Millisecond, maxENIBackoffDelay, 0.3, 2), 5, func() error {
start := time.Now()
_, err := cache.ec2SVC.CreateTagsWithContext(context.Background(), input)
awsAPILatency.WithLabelValues("CreateTags", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start))
Expand All @@ -786,38 +784,6 @@ func (cache *EC2InstanceMetadataCache) tagENI(eniID string, maxBackoffDelay time
})
}

//parseAdditionalEniTagsMap will create a map for additional tags
func parseAdditionalEniTagsMap(additionalEniTags string) (map[string]string, error) {
var additionalEniTagsMap map[string]string

// If duplicate keys exist, the value of the key will be the value of latter key.
err := json.Unmarshal([]byte(additionalEniTags), &additionalEniTagsMap)
if err != nil {
log.Errorf("Invalid format for ADDITIONAL_ENI_TAGS. Expected a json hash: %v", err)
}
return additionalEniTagsMap, err
}

// MapToTags converts a map to a slice of tags.
func mapToTags(tagsMap map[string]string, tags []*ec2.Tag) []*ec2.Tag {
if tagsMap == nil {
return tags
}

for key, value := range tagsMap {
keyPrefix := reservedTagKeyPrefix
if strings.Contains(key, keyPrefix) {
log.Warnf("Additional tags has %s prefix. Ignoring %s tag as it is reserved", keyPrefix, key)
continue
}
tags = append(tags, &ec2.Tag{
Key: aws.String(key),
Value: aws.String(value),
})
}
return tags
}

// containsPrivateIPAddressLimitExceededError returns whether exceeds ENI's IP address limit
func containsPrivateIPAddressLimitExceededError(err error) bool {
if aerr, ok := err.(awserr.Error); ok {
Expand Down Expand Up @@ -1072,10 +1038,7 @@ func (cache *EC2InstanceMetadataCache) DescribeAllENIs() (DescribeAllENIsResult,
}
// Check IPv4 addresses
logOutOfSyncState(eniID, eniMetadata.IPv4Addresses, ec2res.PrivateIpAddresses)
tags := getTags(ec2res, eniMetadata.ENIID)
if len(tags) > 0 {
tagMap[eniMetadata.ENIID] = tags
}
tagMap[eniMetadata.ENIID] = convertSDKTagsToTags(ec2res.TagSet)
}
return DescribeAllENIsResult{
ENIMetadata: verifiedENIs,
Expand All @@ -1086,17 +1049,59 @@ func (cache *EC2InstanceMetadataCache) DescribeAllENIs() (DescribeAllENIsResult,
}, nil
}

// getTags collects tags from an EC2 DescribeNetworkInterfaces call
func getTags(ec2res *ec2.NetworkInterface, eniID string) map[string]string {
tags := make(map[string]string, len(ec2res.TagSet))
for _, tag := range ec2res.TagSet {
if tag.Key == nil || tag.Value == nil {
log.Errorf("nil tag on ENI: %v", eniID)
continue
// convertTagsToSDKTags converts tags in stringMap format to AWS SDK format
func convertTagsToSDKTags(tagsMap map[string]string) []*ec2.Tag {
if len(tagsMap) == 0 {
return nil
}

sdkTags := make([]*ec2.Tag, 0, len(tagsMap))
for _, key := range sets.StringKeySet(tagsMap).List() {
sdkTags = append(sdkTags, &ec2.Tag{
Key: aws.String(key),
Value: aws.String(tagsMap[key]),
})
}
return sdkTags
}

// convertSDKTagsToTags converts tags in AWS SDKs format to stringMap format
func convertSDKTagsToTags(sdkTags []*ec2.Tag) map[string]string {
if len(sdkTags) == 0 {
return nil
}

tagsMap := make(map[string]string, len(sdkTags))
for _, sdkTag := range sdkTags {
tagsMap[aws.StringValue(sdkTag.Key)] = aws.StringValue(sdkTag.Value)
}
return tagsMap
}

// loadAdditionalENITags will load the additional ENI Tags from environment variables.
func loadAdditionalENITags() map[string]string {
additionalENITagsStr := os.Getenv(additionalEniTagsEnvVar)
if additionalENITagsStr == "" {
return nil
}

// TODO: ideally we should fail in CNI init phase if the validation fails instead of warn.
// currently we only warn to be backwards-compatible and keep changes minimal in this version.

var additionalENITags map[string]string
// If duplicate keys exist, the value of the key will be the value of latter key.
err := json.Unmarshal([]byte(additionalENITagsStr), &additionalENITags)
if err != nil {
log.Warnf("failed to parse additional ENI Tags from env %v due to %v", additionalEniTagsEnvVar, err)
return nil
}
for key := range additionalENITags {
if strings.Contains(key, reservedTagKeyPrefix) {
log.Warnf("ignoring tagKey %v from additional ENI Tags as it contains reserved prefix %v", key, reservedTagKeyPrefix)
delete(additionalENITags, key)
}
tags[*tag.Key] = *tag.Value
}
return tags
return additionalENITags
}

var eniErrorMessageRegex = regexp.MustCompile("'([a-zA-Z0-9-]+)'")
Expand Down Expand Up @@ -1338,7 +1343,7 @@ func (cache *EC2InstanceMetadataCache) cleanUpLeakedENIsInternal(startupDelay ti
time.Sleep(startupDelay)

log.Debug("Checking for leaked AWS CNI ENIs.")
networkInterfaces, err := cache.getFilteredListOfNetworkInterfaces()
networkInterfaces, err := cache.getLeakedENIs()
if err != nil {
log.Warnf("Unable to get leaked ENIs: %v", err)
} else {
Expand Down Expand Up @@ -1388,26 +1393,34 @@ func (cache *EC2InstanceMetadataCache) tagENIcreateTS(eniID string, maxBackoffDe
})
}

// getFilteredListOfNetworkInterfaces calls DescribeNetworkInterfaces to get all available ENIs that were allocated by
// getLeakedENIs calls DescribeNetworkInterfaces to get all available ENIs that were allocated by
// the AWS CNI plugin, but were not deleted.
func (cache *EC2InstanceMetadataCache) getFilteredListOfNetworkInterfaces() ([]*ec2.NetworkInterface, error) {
// The tag key has to be "node.k8s.amazonaws.com/instance_id"
tagFilter := &ec2.Filter{
Name: aws.String("tag-key"),
Values: []*string{
aws.String(eniNodeTagKey),
func (cache *EC2InstanceMetadataCache) getLeakedENIs() ([]*ec2.NetworkInterface, error) {
leakedENIFilters := []*ec2.Filter{
{
Name: aws.String("tag-key"),
Values: []*string{
aws.String(eniNodeTagKey),
},
},
}
// Only fetch "available" ENIs.
statusFilter := &ec2.Filter{
Name: aws.String("status"),
Values: []*string{
aws.String("available"),
{
Name: aws.String("status"),
Values: []*string{
aws.String(ec2.NetworkInterfaceStatusAvailable),
},
},
}
if cache.clusterName != "" {
leakedENIFilters = append(leakedENIFilters, &ec2.Filter{
Name: aws.String(fmt.Sprintf("tag:%s", eniClusterTagKey)),
Values: []*string{
aws.String(cache.clusterName),
},
})
}

input := &ec2.DescribeNetworkInterfacesInput{
Filters: []*ec2.Filter{tagFilter, statusFilter},
Filters: leakedENIFilters,
MaxResults: aws.Int64(describeENIPageSize),
}

Expand All @@ -1418,7 +1431,7 @@ func (cache *EC2InstanceMetadataCache) getFilteredListOfNetworkInterfaces() ([]*
return nil
}
// Check that it's not a newly created ENI
tags := getTags(networkInterface, aws.StringValue(networkInterface.NetworkInterfaceId))
tags := convertSDKTagsToTags(networkInterface.TagSet)

if value, ok := tags[eniCreatedAtTagKey]; ok {
parsedTime, err := time.Parse(time.RFC3339, value)
Expand Down
Loading

0 comments on commit 1390229

Please sign in to comment.