Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick PR-1482 into release-1.8 #1496

Merged
merged 1 commit into from
Jun 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 128 additions & 115 deletions pkg/awsutils/awsutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ type APIs interface {
// FreeENI detaches ENI interface and deletes it
FreeENI(eniName string) error

// TagENI Tags ENI with current tags to contain expected tags.
TagENI(eniID string, currentTags map[string]string) error

// GetAttachedENIs retrieves eni information from instance metadata service
GetAttachedENIs() (eniList []ENIMetadata, err error)

Expand Down Expand Up @@ -169,19 +172,23 @@ type APIs interface {
// EC2InstanceMetadataCache caches instance metadata
type EC2InstanceMetadataCache struct {
// metadata info
securityGroups StringSet
subnetID string
localIPv4 net.IP
instanceID string
instanceType string
primaryENI string
primaryENImac string
availabilityZone string
region string
securityGroups StringSet
subnetID string
localIPv4 net.IP
instanceID string
instanceType string
primaryENI string
primaryENImac string
availabilityZone string
region string

unmanagedENIs StringSet
useCustomNetworking bool
cniunmanagedENIs StringSet

clusterName string
additionalENITags map[string]string

imds TypedIMDS
ec2SVC ec2wrapper.EC2
}
Expand Down Expand Up @@ -328,6 +335,8 @@ func New(useCustomNetworking bool) (*EC2InstanceMetadataCache, error) {

cache := &EC2InstanceMetadataCache{}
cache.imds = TypedIMDS{instrumentedIMDS{ec2Metadata}}
cache.clusterName = os.Getenv(clusterNameEnvVar)
cache.additionalENITags = loadAdditionalENITags()

region, err := ec2Metadata.Region()
if err != nil {
Expand Down Expand Up @@ -627,9 +636,6 @@ func (cache *EC2InstanceMetadataCache) AllocENI(useCustomCfg bool, sg []*string,
return "", errors.Wrap(err, "AllocENI: error attaching ENI")
}

// Once the ENI is attached, tag it.
cache.tagENI(eniID, maxENIBackoffDelay)

// Also change the ENI's attribute so that the ENI will be deleted when the instance is deleted.
attributeInput := &ec2.ModifyNetworkInterfaceAttributeInput{
Attachment: &ec2.NetworkInterfaceAttachmentChanges{
Expand Down Expand Up @@ -682,25 +688,24 @@ func (cache *EC2InstanceMetadataCache) attachENI(eniID string) (string, error) {
// return ENI id, error
func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string, subnet string) (string, error) {
eniDescription := eniDescriptionPrefix + cache.instanceID
//Tag on create with create TS
tags := []*ec2.Tag{
{
Key: aws.String(eniCreatedAtTagKey),
Value: aws.String(time.Now().Format(time.RFC3339)),
},
tags := map[string]string{
eniCreatedAtTagKey: time.Now().Format(time.RFC3339),
}
resourceType := "network-interface"
tagspec := []*ec2.TagSpecification{
for key, value := range cache.buildENITags() {
tags[key] = value
}
tagSpec := []*ec2.TagSpecification{
{
ResourceType: &resourceType,
Tags: tags,
ResourceType: aws.String(ec2.ResourceTypeNetworkInterface),
Tags: convertTagsToSDKTags(tags),
},
}

input := &ec2.CreateNetworkInterfaceInput{
Description: aws.String(eniDescription),
Groups: aws.StringSlice(cache.securityGroups.SortedList()),
SubnetId: aws.String(cache.subnetID),
TagSpecifications: tagspec,
TagSpecifications: tagSpec,
}

if useCustomCfg {
Expand All @@ -715,11 +720,8 @@ func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string
log.Info("Using same config as the primary interface for the new ENI")
}

var sgs []string
for i := range input.Groups {
sgs = append(sgs, *input.Groups[i])
}
log.Infof("Creating ENI with security groups: %v in subnet: %s", sgs, *input.SubnetId)
log.Infof("Creating ENI with security groups: %v in subnet: %s", aws.StringValueSlice(input.Groups), aws.StringValue(input.SubnetId))

start := time.Now()
result, err := cache.ec2SVC.CreateNetworkInterfaceWithContext(context.Background(), input)
awsAPILatency.WithLabelValues("CreateNetworkInterface", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start))
Expand All @@ -732,47 +734,43 @@ func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string
return aws.StringValue(result.NetworkInterface.NetworkInterfaceId), nil
}

func (cache *EC2InstanceMetadataCache) tagENI(eniID string, maxBackoffDelay time.Duration) {
// Tag the ENI with "node.k8s.amazonaws.com/instance_id=<instance_id>"
tags := []*ec2.Tag{
{
Key: aws.String(eniNodeTagKey),
Value: aws.String(cache.instanceID),
},
// buildENITags computes the desired AWS Tags for eni
func (cache *EC2InstanceMetadataCache) buildENITags() map[string]string {
tags := map[string]string{
eniNodeTagKey: cache.instanceID,
}

// If the CLUSTER_NAME env var is present,
// If clusterName is provided,
// tag the ENI with "cluster.k8s.amazonaws.com/name=<cluster_name>"
clusterName := os.Getenv(clusterNameEnvVar)
if clusterName != "" {
tags = append(tags, &ec2.Tag{
Key: aws.String(eniClusterTagKey),
Value: aws.String(clusterName),
})
if cache.clusterName != "" {
tags[eniClusterTagKey] = cache.clusterName
}
for key, value := range cache.additionalENITags {
tags[key] = value
}
return tags
}

//additionalEniTags for adding additional tags on ENI
additionalEniTags := os.Getenv(additionalEniTagsEnvVar)
if additionalEniTags != "" {
tagsMap, err := parseAdditionalEniTagsMap(additionalEniTags)
if err != nil {
log.Warnf("Failed to add additional tags to the newly created ENI %s: %v", eniID, err)
func (cache *EC2InstanceMetadataCache) TagENI(eniID string, currentTags map[string]string) error {
tagChanges := make(map[string]string)
for tagKey, tagValue := range cache.buildENITags() {
if currentTagValue, ok := currentTags[tagKey]; !ok || currentTagValue != tagValue {
tagChanges[tagKey] = tagValue
}
tags = mapToTags(tagsMap, tags)
}

for _, tag := range tags {
log.Debugf("Trying to tag newly created ENI: key=%s, value=%s", aws.StringValue(tag.Key), aws.StringValue(tag.Value))
if len(tagChanges) == 0 {
return nil
}

input := &ec2.CreateTagsInput{
Resources: []*string{
aws.String(eniID),
},
Tags: tags,
Tags: convertTagsToSDKTags(tagChanges),
}

_ = retry.NWithBackoff(retry.NewSimpleBackoff(500*time.Millisecond, maxBackoffDelay, 0.3, 2), 5, func() error {
log.Debugf("Tagging ENI %s with missing tags: %v", eniID, tagChanges)
return retry.NWithBackoff(retry.NewSimpleBackoff(500*time.Millisecond, maxENIBackoffDelay, 0.3, 2), 5, func() error {
start := time.Now()
_, err := cache.ec2SVC.CreateTagsWithContext(context.Background(), input)
awsAPILatency.WithLabelValues("CreateTags", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start))
Expand All @@ -786,38 +784,6 @@ func (cache *EC2InstanceMetadataCache) tagENI(eniID string, maxBackoffDelay time
})
}

//parseAdditionalEniTagsMap will create a map for additional tags
func parseAdditionalEniTagsMap(additionalEniTags string) (map[string]string, error) {
var additionalEniTagsMap map[string]string

// If duplicate keys exist, the value of the key will be the value of latter key.
err := json.Unmarshal([]byte(additionalEniTags), &additionalEniTagsMap)
if err != nil {
log.Errorf("Invalid format for ADDITIONAL_ENI_TAGS. Expected a json hash: %v", err)
}
return additionalEniTagsMap, err
}

// MapToTags converts a map to a slice of tags.
func mapToTags(tagsMap map[string]string, tags []*ec2.Tag) []*ec2.Tag {
if tagsMap == nil {
return tags
}

for key, value := range tagsMap {
keyPrefix := reservedTagKeyPrefix
if strings.Contains(key, keyPrefix) {
log.Warnf("Additional tags has %s prefix. Ignoring %s tag as it is reserved", keyPrefix, key)
continue
}
tags = append(tags, &ec2.Tag{
Key: aws.String(key),
Value: aws.String(value),
})
}
return tags
}

// containsPrivateIPAddressLimitExceededError returns whether exceeds ENI's IP address limit
func containsPrivateIPAddressLimitExceededError(err error) bool {
if aerr, ok := err.(awserr.Error); ok {
Expand Down Expand Up @@ -1072,10 +1038,7 @@ func (cache *EC2InstanceMetadataCache) DescribeAllENIs() (DescribeAllENIsResult,
}
// Check IPv4 addresses
logOutOfSyncState(eniID, eniMetadata.IPv4Addresses, ec2res.PrivateIpAddresses)
tags := getTags(ec2res, eniMetadata.ENIID)
if len(tags) > 0 {
tagMap[eniMetadata.ENIID] = tags
}
tagMap[eniMetadata.ENIID] = convertSDKTagsToTags(ec2res.TagSet)
}
return DescribeAllENIsResult{
ENIMetadata: verifiedENIs,
Expand All @@ -1086,17 +1049,59 @@ func (cache *EC2InstanceMetadataCache) DescribeAllENIs() (DescribeAllENIsResult,
}, nil
}

// getTags collects tags from an EC2 DescribeNetworkInterfaces call
func getTags(ec2res *ec2.NetworkInterface, eniID string) map[string]string {
tags := make(map[string]string, len(ec2res.TagSet))
for _, tag := range ec2res.TagSet {
if tag.Key == nil || tag.Value == nil {
log.Errorf("nil tag on ENI: %v", eniID)
continue
// convertTagsToSDKTags converts tags in stringMap format to AWS SDK format
func convertTagsToSDKTags(tagsMap map[string]string) []*ec2.Tag {
if len(tagsMap) == 0 {
return nil
}

sdkTags := make([]*ec2.Tag, 0, len(tagsMap))
for _, key := range sets.StringKeySet(tagsMap).List() {
sdkTags = append(sdkTags, &ec2.Tag{
Key: aws.String(key),
Value: aws.String(tagsMap[key]),
})
}
return sdkTags
}

// convertSDKTagsToTags converts tags in AWS SDKs format to stringMap format
func convertSDKTagsToTags(sdkTags []*ec2.Tag) map[string]string {
if len(sdkTags) == 0 {
return nil
}

tagsMap := make(map[string]string, len(sdkTags))
for _, sdkTag := range sdkTags {
tagsMap[aws.StringValue(sdkTag.Key)] = aws.StringValue(sdkTag.Value)
}
return tagsMap
}

// loadAdditionalENITags will load the additional ENI Tags from environment variables.
func loadAdditionalENITags() map[string]string {
additionalENITagsStr := os.Getenv(additionalEniTagsEnvVar)
if additionalENITagsStr == "" {
return nil
}

// TODO: ideally we should fail in CNI init phase if the validation fails instead of warn.
// currently we only warn to be backwards-compatible and keep changes minimal in this version.

var additionalENITags map[string]string
// If duplicate keys exist, the value of the key will be the value of latter key.
err := json.Unmarshal([]byte(additionalENITagsStr), &additionalENITags)
if err != nil {
log.Warnf("failed to parse additional ENI Tags from env %v due to %v", additionalEniTagsEnvVar, err)
return nil
}
for key := range additionalENITags {
if strings.Contains(key, reservedTagKeyPrefix) {
log.Warnf("ignoring tagKey %v from additional ENI Tags as it contains reserved prefix %v", key, reservedTagKeyPrefix)
delete(additionalENITags, key)
}
tags[*tag.Key] = *tag.Value
}
return tags
return additionalENITags
}

var eniErrorMessageRegex = regexp.MustCompile("'([a-zA-Z0-9-]+)'")
Expand Down Expand Up @@ -1338,7 +1343,7 @@ func (cache *EC2InstanceMetadataCache) cleanUpLeakedENIsInternal(startupDelay ti
time.Sleep(startupDelay)

log.Debug("Checking for leaked AWS CNI ENIs.")
networkInterfaces, err := cache.getFilteredListOfNetworkInterfaces()
networkInterfaces, err := cache.getLeakedENIs()
if err != nil {
log.Warnf("Unable to get leaked ENIs: %v", err)
} else {
Expand Down Expand Up @@ -1388,26 +1393,34 @@ func (cache *EC2InstanceMetadataCache) tagENIcreateTS(eniID string, maxBackoffDe
})
}

// getFilteredListOfNetworkInterfaces calls DescribeNetworkInterfaces to get all available ENIs that were allocated by
// getLeakedENIs calls DescribeNetworkInterfaces to get all available ENIs that were allocated by
// the AWS CNI plugin, but were not deleted.
func (cache *EC2InstanceMetadataCache) getFilteredListOfNetworkInterfaces() ([]*ec2.NetworkInterface, error) {
// The tag key has to be "node.k8s.amazonaws.com/instance_id"
tagFilter := &ec2.Filter{
Name: aws.String("tag-key"),
Values: []*string{
aws.String(eniNodeTagKey),
func (cache *EC2InstanceMetadataCache) getLeakedENIs() ([]*ec2.NetworkInterface, error) {
leakedENIFilters := []*ec2.Filter{
{
Name: aws.String("tag-key"),
Values: []*string{
aws.String(eniNodeTagKey),
},
},
}
// Only fetch "available" ENIs.
statusFilter := &ec2.Filter{
Name: aws.String("status"),
Values: []*string{
aws.String("available"),
{
Name: aws.String("status"),
Values: []*string{
aws.String(ec2.NetworkInterfaceStatusAvailable),
},
},
}
if cache.clusterName != "" {
leakedENIFilters = append(leakedENIFilters, &ec2.Filter{
Name: aws.String(fmt.Sprintf("tag:%s", eniClusterTagKey)),
Values: []*string{
aws.String(cache.clusterName),
},
})
}

input := &ec2.DescribeNetworkInterfacesInput{
Filters: []*ec2.Filter{tagFilter, statusFilter},
Filters: leakedENIFilters,
MaxResults: aws.Int64(describeENIPageSize),
}

Expand All @@ -1418,7 +1431,7 @@ func (cache *EC2InstanceMetadataCache) getFilteredListOfNetworkInterfaces() ([]*
return nil
}
// Check that it's not a newly created ENI
tags := getTags(networkInterface, aws.StringValue(networkInterface.NetworkInterfaceId))
tags := convertSDKTagsToTags(networkInterface.TagSet)

if value, ok := tags[eniCreatedAtTagKey]; ok {
parsedTime, err := time.Parse(time.RFC3339, value)
Expand Down
Loading