Skip to content

Commit

Permalink
Add AWS Health event support for QP mode
Browse files Browse the repository at this point in the history
  • Loading branch information
AustinSiu committed Oct 14, 2021
1 parent 6a22485 commit b312b87
Show file tree
Hide file tree
Showing 5 changed files with 421 additions and 34 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/exporters/metric/prometheus v0.20.0
go.opentelemetry.io/otel/metric v0.20.0
go.uber.org/multierr v1.7.0
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a // indirect
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
golang.org/x/sys v0.0.0-20210608053332-aa57babbf139
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -608,8 +608,12 @@ go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5/go.mod h1:nmDLcffg48OtT/PSW0H
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec=
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
Expand Down Expand Up @@ -916,6 +920,8 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
gotest.tools/v3 v3.0.3 h1:4AuOwCGf4lLR9u3YOe2awrHygurzhO/HeQ6laiA6Sx0=
gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
Expand Down
124 changes: 124 additions & 0 deletions pkg/monitor/sqsevent/health-maintenance-event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package sqsevent

import (
"encoding/json"
"fmt"
"strings"

"github.com/aws/aws-node-termination-handler/pkg/monitor"
"github.com/aws/aws-node-termination-handler/pkg/node"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/rs/zerolog/log"
)

/* Example AWS Health Scheduled Maintenance EC2 Event:
{
"version": "0",
"id": "7fb65329-1628-4cf3-a740-95fg457h1402",
"detail-type": "AWS Health Event",
"source": "aws.health",
"account": "account id",
"time": "2016-06-05T06:27:57Z",
"region": "us-east-1",
"resources": ["i-12345678"],
"detail": {
"eventArn": "arn:aws:health:region::event/id",
"service": "EC2",
"eventTypeCode": "AWS_EC2_DEDICATED_HOST_NETWORK_MAINTENANCE_SCHEDULED",
"eventTypeCategory": "scheduledChange",
"startTime": "Sat, 05 Jun 2016 15:10:09 GMT",
"eventDescription": [{
"language": "en_US",
"latestDescription": "A description of the event will be provided here"
}],
"affectedEntities": [{
"entityValue": "i-12345678",
"tags": {
"stage": "prod",
"app": "my-app"
}
}]
}
}
*/

// AffectedEntity holds information about an entity that is affected by a Health event
type AffectedEntity struct {
EntityValue string `json:"entityValue"`
}

// ScheduledMaintenanceDetail holds the event details for AWS Health scheduled EC2 change events from Amazon EventBridge
type ScheduledMaintenanceDetail struct {
EventTypeCategory string `json:"eventTypeCategory"`
Service string `json:"service"`
AffectedEntities []AffectedEntity `json:"affectedEntities"`
}

const supportedEventCategoryTypes = "scheduledChange"

func (m SQSMonitor) maintenanceNoticeToInterruptionEvents(event EventBridgeEvent, message *sqs.Message) ([]InterruptionEventWrapper, error) {
scheduledMaintenanceDetail := &ScheduledMaintenanceDetail{}
err := json.Unmarshal(event.Detail, scheduledMaintenanceDetail)
if err != nil {
return nil, err
}

if scheduledMaintenanceDetail.Service != "EC2" {
return nil, fmt.Errorf("Amazon EventBridge events for service (%s) are not supported", scheduledMaintenanceDetail.Service)
}

if !strings.Contains(supportedEventCategoryTypes, scheduledMaintenanceDetail.EventTypeCategory) {
return nil, fmt.Errorf("Amazon EventBridge events with EventTypeCategory (%s) are not supported", scheduledMaintenanceDetail.EventTypeCategory)
}

interruptionEventWrappers := make([]InterruptionEventWrapper, len(event.Resources))

for i, affectedEntity := range scheduledMaintenanceDetail.AffectedEntities {
nodeName, err := m.retrieveNodeName(affectedEntity.EntityValue)
if err != nil {
interruptionEventWrappers[i] = InterruptionEventWrapper{nil, err}
continue
}
asgName, _ := m.retrieveAutoScalingGroupName(affectedEntity.EntityValue)
interruptionEvent := monitor.InterruptionEvent{
EventID: fmt.Sprintf("aws-health-maintenance-event-%x", event.ID),
Kind: SQSTerminateKind,
AutoScalingGroupName: asgName,
StartTime: event.getTime(),
NodeName: nodeName,
InstanceID: affectedEntity.EntityValue,
Description: fmt.Sprintf("AWS Health maintenance event received. Instance %s will be interrupted at %s \n", affectedEntity.EntityValue, event.getTime()),
}
interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error {
errs := m.deleteMessages([]*sqs.Message{message})
if errs != nil {
return errs[0]
}
return nil
}
interruptionEvent.PreDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error {
err := n.TaintSpotItn(interruptionEvent.NodeName, interruptionEvent.EventID)
if err != nil {
log.Err(err).Msgf("Unable to taint node with taint %s:%s", node.SpotInterruptionTaint, interruptionEvent.EventID)
}
return nil
}

interruptionEventWrappers[i] = InterruptionEventWrapper{&interruptionEvent, nil}
}

return interruptionEventWrappers, nil
}
109 changes: 75 additions & 34 deletions pkg/monitor/sqsevent/sqs-monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ type SQSMonitor struct {
ManagedAsgTag string
}

// Convenience wrapper for handling a pair of an interruption event and a related error
type InterruptionEventWrapper struct {
InterruptionEvent *monitor.InterruptionEvent
Err error
}

// Kind denotes the kind of event that is processed
func (m SQSMonitor) Kind() string {
return SQSTerminateKind
Expand All @@ -63,53 +69,70 @@ func (m SQSMonitor) Monitor() error {
return err
}

failedEvents := 0
failedQueueEventsCount := 0
for _, message := range messages {
interruptionEvent, err := m.processSQSMessage(message)
switch {
case errors.Is(err, ErrNodeStateNotRunning):
// If the node is no longer running, just log and delete the message. If message deletion fails, count it as an error.
log.Warn().Err(err).Msg("dropping event for an already terminated node")
errs := m.deleteMessages([]*sqs.Message{message})
if len(errs) > 0 {
log.Err(errs[0]).Msg("error deleting event for already terminated node")
failedEvents++
}
interruptionEventWrappers, err := m.processSQSMessage(message)
if err != nil {
log.Err(err).Msg("ignoring SQS message due to error while processing")
failedQueueEventsCount++
continue
}
failedInterruptionEventsCount := 0
for i, eventWrapper := range interruptionEventWrappers {
switch {
case errors.Is(eventWrapper.Err, ErrNodeStateNotRunning):
// If the node is no longer running, just log and delete the message. If message deletion fails, count it as an error.
log.Warn().Err(eventWrapper.Err).Msg("dropping interruption event for an already terminated node")
if (i == len(interruptionEventWrappers)-1) && (failedInterruptionEventsCount == len(interruptionEventWrappers)-1) {
// Log that all events failed, and delete the message from the queue
log.Warn().Err(eventWrapper.Err).Msg("all interruption events failed drop, moving to delete message from queue")
deletionErrs := m.deleteMessages([]*sqs.Message{message})
if len(deletionErrs) > 0 {
log.Err(deletionErrs[0]).Msg("error deleting queue event for already terminated node(s)")
failedInterruptionEventsCount++
}
}

case err != nil:
// Log errors and record as failed events
log.Err(err).Msg("ignoring event due to error")
failedEvents++
case eventWrapper.Err != nil:
// Log errors and record as failed events
log.Err(eventWrapper.Err).Msg("ignoring interruption event due to error")
failedInterruptionEventsCount++

case err == nil && interruptionEvent != nil && interruptionEvent.Kind == SQSTerminateKind:
// Successfully processed SQS message into a SQSTerminateKind interruption event
log.Debug().Msgf("Sending %s interruption event to the interruption channel", SQSTerminateKind)
m.InterruptionChan <- *interruptionEvent
case eventWrapper.Err == nil && eventWrapper.InterruptionEvent != nil && eventWrapper.InterruptionEvent.Kind == SQSTerminateKind:
// Successfully processed SQS message into a SQSTerminateKind interruption event
log.Debug().Msgf("Sending %s interruption event to the interruption channel", SQSTerminateKind)
m.InterruptionChan <- *eventWrapper.InterruptionEvent
}
}
if failedInterruptionEventsCount == len(interruptionEventWrappers) {
failedQueueEventsCount++
}
}

if len(messages) > 0 && failedEvents == len(messages) {
if len(messages) > 0 && failedQueueEventsCount == len(messages) {
return fmt.Errorf("none of the waiting queue events could be processed")
}

return nil
}

// processSQSMessage checks sqs for new messages and returns interruption events
func (m SQSMonitor) processSQSMessage(message *sqs.Message) (*monitor.InterruptionEvent, error) {
func (m SQSMonitor) processSQSMessage(message *sqs.Message) ([]InterruptionEventWrapper, error) {
event := EventBridgeEvent{}
err := json.Unmarshal([]byte(*message.Body), &event)
if err != nil {
return nil, err
}

interruptionEventWrappers := []InterruptionEventWrapper{}
interruptionEvent := monitor.InterruptionEvent{}

switch event.Source {
case "aws.autoscaling":
interruptionEvent, err = m.asgTerminationToInterruptionEvent(event, message)
if err != nil {
return nil, err
return append(interruptionEventWrappers, InterruptionEventWrapper{&interruptionEvent, err}), nil

}
case "aws.ec2":
if event.DetailType == "EC2 Instance State-change Notification" {
Expand All @@ -120,28 +143,46 @@ func (m SQSMonitor) processSQSMessage(message *sqs.Message) (*monitor.Interrupti
interruptionEvent, err = m.rebalanceRecommendationToInterruptionEvent(event, message)
}
if err != nil {
return nil, err
return append(interruptionEventWrappers, InterruptionEventWrapper{&interruptionEvent, err}), nil
}
case "aws.health":
if event.DetailType == "AWS Health Event" {
interruptionEventWrappers, err = m.maintenanceNoticeToInterruptionEvents(event, message)
}
if err != nil {
return append(interruptionEventWrappers, InterruptionEventWrapper{&interruptionEvent, err}), nil
}
default:
return nil, fmt.Errorf("Event source (%s) is not supported", event.Source)
return nil, fmt.Errorf("event source (%s) is not supported", event.Source)
}

// Bail if empty event is returned after parsing
if interruptionEvent.EventID == "" {
return nil, nil
if interruptionEvent.EventID != "" {
interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{&interruptionEvent, err})
}

if m.CheckIfManaged {
isManaged, err := m.isInstanceManaged(interruptionEvent.InstanceID)
if err != nil {
return &interruptionEvent, err
// override problematic events
for i := range interruptionEventWrappers {
// Bail if empty event is returned after parsing
if interruptionEventWrappers[i].InterruptionEvent.EventID == "" {
interruptionEventWrappers[i].InterruptionEvent = nil
interruptionEventWrappers[i].Err = nil
continue
}
if !isManaged {
return nil, nil

if m.CheckIfManaged {
isManaged, err := m.isInstanceManaged(interruptionEventWrappers[i].InterruptionEvent.InstanceID)
if err != nil {
interruptionEventWrappers[i].Err = err
continue
}
if !isManaged {
interruptionEventWrappers[i].InterruptionEvent = nil
interruptionEventWrappers[i].Err = nil
}
}
}

return &interruptionEvent, err
return interruptionEventWrappers, err
}

// receiveQueueMessages checks the configured SQS queue for new messages
Expand Down
Loading

0 comments on commit b312b87

Please sign in to comment.