Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sqs deletion issue 360 #364

Merged
merged 6 commits into from
Feb 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/monitor/sqsevent/asg-lifecycle-event.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type LifecycleDetail struct {
LifecycleTransition string `json:"LifecycleTransition"`
}

func (m SQSMonitor) asgTerminationToInterruptionEvent(event EventBridgeEvent, messages []*sqs.Message) (monitor.InterruptionEvent, error) {
func (m SQSMonitor) asgTerminationToInterruptionEvent(event EventBridgeEvent, message *sqs.Message) (monitor.InterruptionEvent, error) {
lifecycleDetail := &LifecycleDetail{}
err := json.Unmarshal(event.Detail, lifecycleDetail)
if err != nil {
Expand Down Expand Up @@ -94,7 +94,7 @@ func (m SQSMonitor) asgTerminationToInterruptionEvent(event EventBridgeEvent, me
log.Info().Msgf("Completed ASG Lifecycle Hook (%s) for instance %s",
lifecycleDetail.LifecycleHookName,
lifecycleDetail.EC2InstanceID)
errs := m.deleteMessages(messages)
errs := m.deleteMessages([]*sqs.Message{message})
if errs != nil {
return errs[0]
}
Expand All @@ -111,7 +111,7 @@ func (m SQSMonitor) asgTerminationToInterruptionEvent(event EventBridgeEvent, me

if nodeName == "" {
log.Info().Msg("Node name is empty, assuming instance was already terminated, deleting queue message")
errs := m.deleteMessages(messages)
errs := m.deleteMessages([]*sqs.Message{message})
if errs != nil {
log.Warn().Errs("errors", errs).Msg("There was an error deleting the messages")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/monitor/sqsevent/ec2-state-change-event.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type EC2StateChangeDetail struct {

const instanceStatesToDrain = "stopping,stopped,shutting-down,terminated"

func (m SQSMonitor) ec2StateChangeToInterruptionEvent(event EventBridgeEvent, messages []*sqs.Message) (monitor.InterruptionEvent, error) {
func (m SQSMonitor) ec2StateChangeToInterruptionEvent(event EventBridgeEvent, message *sqs.Message) (monitor.InterruptionEvent, error) {
ec2StateChangeDetail := &EC2StateChangeDetail{}
err := json.Unmarshal(event.Detail, ec2StateChangeDetail)
if err != nil {
Expand All @@ -75,7 +75,7 @@ func (m SQSMonitor) ec2StateChangeToInterruptionEvent(event EventBridgeEvent, me
Description: fmt.Sprintf("EC2 State Change event received. Instance went into %s at %s \n", ec2StateChangeDetail.State, event.getTime()),
}
interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error {
errs := m.deleteMessages([]*sqs.Message{messages[0]})
errs := m.deleteMessages([]*sqs.Message{message})
if errs != nil {
return errs[0]
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/monitor/sqsevent/rebalance-recommendation-event.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type RebalanceRecommendationDetail struct {
InstanceID string `json:"instance-id"`
}

func (m SQSMonitor) rebalanceRecommendationToInterruptionEvent(event EventBridgeEvent, messages []*sqs.Message) (monitor.InterruptionEvent, error) {
func (m SQSMonitor) rebalanceRecommendationToInterruptionEvent(event EventBridgeEvent, message *sqs.Message) (monitor.InterruptionEvent, error) {
rebalanceRecDetail := &RebalanceRecommendationDetail{}
err := json.Unmarshal(event.Detail, rebalanceRecDetail)
if err != nil {
Expand All @@ -67,7 +67,7 @@ func (m SQSMonitor) rebalanceRecommendationToInterruptionEvent(event EventBridge
Description: fmt.Sprintf("Rebalance recommendation event received. Instance will be cordoned at %s \n", event.getTime()),
}
interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error {
errs := m.deleteMessages(messages)
errs := m.deleteMessages([]*sqs.Message{message})
if errs != nil {
return errs[0]
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/monitor/sqsevent/spot-itn-event.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type SpotInterruptionDetail struct {
InstanceAction string `json:"instance-action"`
}

func (m SQSMonitor) spotITNTerminationToInterruptionEvent(event EventBridgeEvent, messages []*sqs.Message) (monitor.InterruptionEvent, error) {
func (m SQSMonitor) spotITNTerminationToInterruptionEvent(event EventBridgeEvent, message *sqs.Message) (monitor.InterruptionEvent, error) {
spotInterruptionDetail := &SpotInterruptionDetail{}
err := json.Unmarshal(event.Detail, spotInterruptionDetail)
if err != nil {
Expand All @@ -69,7 +69,7 @@ func (m SQSMonitor) spotITNTerminationToInterruptionEvent(event EventBridgeEvent
Description: fmt.Sprintf("Spot Interruption event received. Instance will be interrupted at %s \n", event.getTime()),
}
interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error {
errs := m.deleteMessages([]*sqs.Message{messages[0]})
errs := m.deleteMessages([]*sqs.Message{message})
if errs != nil {
return errs[0]
}
Expand Down
63 changes: 38 additions & 25 deletions pkg/monitor/sqsevent/sqs-monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,35 +56,48 @@ func (m SQSMonitor) Kind() string {

// Monitor continuously monitors SQS for events and sends interruption events to the passed in channel
func (m SQSMonitor) Monitor() error {
interruptionEvent, err := m.checkForSQSMessage()
log.Debug().Msg("Checking for queue messages")
messages, err := m.receiveQueueMessages(m.QueueURL)
if err != nil {
if errors.Is(err, ErrNodeStateNotRunning) {
log.Warn().Err(err).Msg("dropping event for an already terminated node")
return nil
}
return err
}
if interruptionEvent != nil && interruptionEvent.Kind == SQSTerminateKind {
log.Debug().Msgf("Sending %s interruption event to the interruption channel", SQSTerminateKind)
m.InterruptionChan <- *interruptionEvent
}
return nil
}

// checkForSpotInterruptionNotice checks sqs for new messages and returns interruption events
func (m SQSMonitor) checkForSQSMessage() (*monitor.InterruptionEvent, error) {
failedEvents := 0
for _, message := range messages {
interruptionEvent, err := m.processSQSMessage(message)
switch {
case errors.Is(err, ErrNodeStateNotRunning):
// If the node is no longer running, just log and delete the message. If message deletion fails, count it as an error.
log.Warn().Err(err).Msg("dropping event for an already terminated node")
errs := m.deleteMessages([]*sqs.Message{message})
if len(errs) > 0 {
log.Warn().Err(errs[0]).Msg("error deleting event for already terminated node")
failedEvents++
}

log.Debug().Msg("Checking for queue messages")
messages, err := m.receiveQueueMessages(m.QueueURL)
if err != nil {
return nil, err
case err != nil:
// Log errors and record as failed events
log.Warn().Err(err).Msg("ignoring event due to error")
failedEvents++

case err == nil && interruptionEvent != nil && interruptionEvent.Kind == SQSTerminateKind:
// Successfully processed SQS message into a SQSTerminateKind interruption event
log.Debug().Msgf("Sending %s interruption event to the interruption channel", SQSTerminateKind)
m.InterruptionChan <- *interruptionEvent
}
}
if len(messages) == 0 {
return nil, nil

if len(messages) > 0 && failedEvents == len(messages) {
return fmt.Errorf("All of the waiting queue events could not be processed")
}

return nil
}

// processSQSMessage checks sqs for new messages and returns interruption events
func (m SQSMonitor) processSQSMessage(message *sqs.Message) (*monitor.InterruptionEvent, error) {
event := EventBridgeEvent{}
err = json.Unmarshal([]byte(*messages[0].Body), &event)
err := json.Unmarshal([]byte(*message.Body), &event)
if err != nil {
return nil, err
}
Expand All @@ -93,17 +106,17 @@ func (m SQSMonitor) checkForSQSMessage() (*monitor.InterruptionEvent, error) {

switch event.Source {
case "aws.autoscaling":
interruptionEvent, err = m.asgTerminationToInterruptionEvent(event, messages)
interruptionEvent, err = m.asgTerminationToInterruptionEvent(event, message)
if err != nil {
return nil, err
}
case "aws.ec2":
if event.DetailType == "EC2 Instance State-change Notification" {
interruptionEvent, err = m.ec2StateChangeToInterruptionEvent(event, messages)
interruptionEvent, err = m.ec2StateChangeToInterruptionEvent(event, message)
} else if event.DetailType == "EC2 Spot Instance Interruption Warning" {
interruptionEvent, err = m.spotITNTerminationToInterruptionEvent(event, messages)
interruptionEvent, err = m.spotITNTerminationToInterruptionEvent(event, message)
} else if event.DetailType == "EC2 Instance Rebalance Recommendation" {
interruptionEvent, err = m.rebalanceRecommendationToInterruptionEvent(event, messages)
interruptionEvent, err = m.rebalanceRecommendationToInterruptionEvent(event, message)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -140,7 +153,7 @@ func (m SQSMonitor) receiveQueueMessages(qURL string) ([]*sqs.Message, error) {
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: &qURL,
MaxNumberOfMessages: aws.Int64(2),
MaxNumberOfMessages: aws.Int64(5),
VisibilityTimeout: aws.Int64(20), // 20 seconds
WaitTimeSeconds: aws.Int64(0),
})
Expand Down
Loading