Skip to content

Commit

Permalink
multithreaded event processor
Browse files Browse the repository at this point in the history
This replaces the single process execution of events to parallel processing, solving the issue that happens when NTH is busy / blocked (retrying to evict) and eventually will miss to process events for other nodes going down the same time time.

Example:
3 nodes roll at a time because of batchSize or spot interruption.
A deployment has a pdb limit of maxUnavailable of 1 - that will block NTH in a eviction retry loop and it will miss the third node eviction.

The amount of workers are capped to prevent a memory runnaway
  • Loading branch information
universam1 committed Dec 23, 2020
1 parent a85c4fd commit 583e2b0
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 46 deletions.
97 changes: 53 additions & 44 deletions cmd/node-termination-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,16 @@ func main() {
// Exit interruption loop if a SIGTERM is received or the channel is closed
break
default:
drainOrCordonIfNecessary(interruptionEventStore, *node, nthConfig, nodeMetadata, metrics)
for event, ok := interruptionEventStore.GetActiveEvent(); ok && !event.InProgress; event, ok = interruptionEventStore.GetActiveEvent() {
select {
case interruptionEventStore.Workers <- 1:
event.InProgress = true
go drainOrCordonIfNecessary(interruptionEventStore, event, *node, nthConfig, nodeMetadata, metrics)
default:
log.Warn().Msg("all workers busy, waiting")
break
}
}
}
}
log.Log().Msg("AWS Node Termination Handler is shutting down")
Expand Down Expand Up @@ -254,59 +263,59 @@ func watchForCancellationEvents(cancelChan <-chan monitor.InterruptionEvent, int
}
}

func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Store, node node.Node, nthConfig config.Config, nodeMetadata ec2metadata.NodeMetadata, metrics observability.Metrics) {
if drainEvent, ok := interruptionEventStore.GetActiveEvent(); ok {
nodeName := drainEvent.NodeName
if drainEvent.PreDrainTask != nil {
err := drainEvent.PreDrainTask(*drainEvent, node)
if err != nil {
log.Log().Err(err).Msg("There was a problem executing the pre-drain task")
}
metrics.NodeActionsInc("pre-drain", nodeName, err)
func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Store, drainEvent *monitor.InterruptionEvent, node node.Node, nthConfig config.Config, nodeMetadata ec2metadata.NodeMetadata, metrics observability.Metrics) {
nodeName := drainEvent.NodeName
if drainEvent.PreDrainTask != nil {
err := drainEvent.PreDrainTask(*drainEvent, node)
if err != nil {
log.Log().Err(err).Msg("There was a problem executing the pre-drain task")
}
metrics.NodeActionsInc("pre-drain", nodeName, err)
}

if nthConfig.CordonOnly || drainEvent.IsRebalanceRecommendation() {
err := node.Cordon(nodeName)
if err != nil {
if errors.IsNotFound(err) {
log.Warn().Err(err).Msgf("node '%s' not found in the cluster", nodeName)
} else {
log.Log().Err(err).Msg("There was a problem while trying to cordon the node")
os.Exit(1)
}
if nthConfig.CordonOnly || drainEvent.IsRebalanceRecommendation() {
err := node.Cordon(nodeName)
if err != nil {
if errors.IsNotFound(err) {
log.Warn().Err(err).Msgf("node '%s' not found in the cluster", nodeName)
} else {
log.Log().Str("node_name", nodeName).Msg("Node successfully cordoned")
err = node.LogPods(nodeName)
if err != nil {
log.Log().Err(err).Msg("There was a problem while trying to log all pod names on the node")
}
metrics.NodeActionsInc("cordon", nodeName, err)
log.Log().Err(err).Msg("There was a problem while trying to cordon the node")
os.Exit(1)
}
} else {
err := node.CordonAndDrain(nodeName)
log.Log().Str("node_name", nodeName).Msg("Node successfully cordoned")
err = node.LogPods(nodeName)
if err != nil {
if errors.IsNotFound(err) {
log.Warn().Err(err).Msgf("node '%s' not found in the cluster", nodeName)
} else {
log.Log().Err(err).Msg("There was a problem while trying to cordon and drain the node")
os.Exit(1)
}
log.Log().Err(err).Msg("There was a problem while trying to log all pod names on the node")
}
metrics.NodeActionsInc("cordon", nodeName, err)
}
} else {
err := node.CordonAndDrain(nodeName)
if err != nil {
if errors.IsNotFound(err) {
log.Warn().Err(err).Msgf("node '%s' not found in the cluster", nodeName)
} else {
log.Log().Str("node_name", nodeName).Msg("Node successfully cordoned and drained")
metrics.NodeActionsInc("cordon-and-drain", nodeName, err)
log.Log().Err(err).Msg("There was a problem while trying to cordon and drain the node")
os.Exit(1)
}
} else {
log.Log().Str("node_name", nodeName).Msg("Node successfully cordoned and drained")
metrics.NodeActionsInc("cordon-and-drain", nodeName, err)
}
}

interruptionEventStore.MarkAllAsDrained(nodeName)
if nthConfig.WebhookURL != "" {
webhook.Post(nodeMetadata, drainEvent, nthConfig)
}
if drainEvent.PostDrainTask != nil {
err := drainEvent.PostDrainTask(*drainEvent, node)
if err != nil {
log.Err(err).Msg("There was a problem executing the post-drain task")
}
metrics.NodeActionsInc("post-drain", nodeName, err)
interruptionEventStore.MarkAllAsDrained(nodeName)
if nthConfig.WebhookURL != "" {
webhook.Post(nodeMetadata, drainEvent, nthConfig)
}
if drainEvent.PostDrainTask != nil {
err := drainEvent.PostDrainTask(*drainEvent, node)
if err != nil {
log.Err(err).Msg("There was a problem executing the post-drain task")
}
metrics.NodeActionsInc("post-drain", nodeName, err)
}
<-interruptionEventStore.Workers

}
2 changes: 1 addition & 1 deletion config/helm/aws-node-termination-handler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ Parameter | Description | Default
`podMonitor.sampleLimit` | Number of scraped samples accepted | `5000`
`podMonitor.labels` | Additional PodMonitor metadata labels | `{}`


### AWS Node Termination Handler - Queue-Processor Mode Configuration

Parameter | Description | Default
Expand All @@ -89,6 +88,7 @@ Parameter | Description | Default
`awsRegion` | If specified, use the AWS region for AWS API calls, else NTH will try to find the region through AWS_REGION env var, IMDS, or the specified queue URL | ``
`checkASGTagBeforeDraining` | If true, check that the instance is tagged with "aws-node-termination-handler/managed" as the key before draining the node | `true`
`managedAsgTag` | The tag to ensure is on a node if checkASGTagBeforeDraining is true | `aws-node-termination-handler/managed`
`workers` | The maximum amount of parallel event processors | `10`

### AWS Node Termination Handler - IMDS Mode Configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ spec:
value: {{ .Values.checkASGTagBeforeDraining | quote }}
- name: MANAGED_ASG_TAG
value: {{ .Values.managedAsgTag | quote }}
- name: WORKERS
value: {{ .Values.workers | quote }}
resources:
{{- toYaml .Values.resources | nindent 12 }}
{{- if .Values.enablePrometheusServer }}
Expand Down
3 changes: 3 additions & 0 deletions config/helm/aws-node-termination-handler/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,6 @@ windowsUpdateStrategy: ""
# If you have disabled IMDSv1 and are relying on IMDSv2, you'll need to increase the IP hop count to 2 before switching this to false
# https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
useHostNetwork: true

# The maximal amount of parallel event processors to handle concurrent events
workers: 10
4 changes: 4 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ const (
logLevelDefault = "INFO"
uptimeFromFileConfigKey = "UPTIME_FROM_FILE"
uptimeFromFileDefault = ""
workersConfigKey = "WORKERS"
workersDefault = 10
// prometheus
enablePrometheusDefault = false
enablePrometheusConfigKey = "ENABLE_PROMETHEUS_SERVER"
Expand Down Expand Up @@ -116,6 +118,7 @@ type Config struct {
AWSRegion string
AWSEndpoint string
QueueURL string
Workers int
AWSSession *session.Session
}

Expand Down Expand Up @@ -162,6 +165,7 @@ func ParseCliArgs() (config Config, err error) {
flag.StringVar(&config.AWSRegion, "aws-region", getEnv(awsRegionConfigKey, ""), "If specified, use the AWS region for AWS API calls")
flag.StringVar(&config.AWSEndpoint, "aws-endpoint", getEnv(awsEndpointConfigKey, ""), "[testing] If specified, use the AWS endpoint to make API calls")
flag.StringVar(&config.QueueURL, "queue-url", getEnv(queueURLConfigKey, ""), "Listens for messages on the specified SQS queue URL")
flag.IntVar(&config.Workers, "workers", getIntEnv(workersConfigKey, workersDefault), "The amount of parallel event processors.")

flag.Parse()

Expand Down
2 changes: 2 additions & 0 deletions pkg/interruptioneventstore/interruption-event-store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Store struct {
interruptionEventStore map[string]*monitor.InterruptionEvent
ignoredEvents map[string]struct{}
atLeastOneEvent bool
Workers chan int
}

// New Creates a new interruption event store
Expand All @@ -38,6 +39,7 @@ func New(nthConfig config.Config) *Store {
NthConfig: nthConfig,
interruptionEventStore: make(map[string]*monitor.InterruptionEvent),
ignoredEvents: make(map[string]struct{}),
Workers: make(chan int, nthConfig.Workers),
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/monitor/sqsevent/sqs-monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (m SQSMonitor) retrieveNodeName(instanceID string) (string, error) {
}
// anything except running might not contain PrivateDnsName
if state != ec2.InstanceStateNameRunning {
return "", ErrNodeStateNotRunning
return "", fmt.Errorf("node: '%s' in state '%s': %w", instanceID, state, ErrNodeStateNotRunning)
}
return "", fmt.Errorf("unable to retrieve PrivateDnsName name for '%s' in state '%s'", instanceID, state)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/monitor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type InterruptionEvent struct {
StartTime time.Time
EndTime time.Time
Drained bool
InProgress bool
PreDrainTask DrainTask `json:"-"`
PostDrainTask DrainTask `json:"-"`
}
Expand Down

0 comments on commit 583e2b0

Please sign in to comment.