-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit cc77672
Showing
15 changed files
with
703 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
resource "aws_sns_topic" "lifecycle" { | ||
name = var.name | ||
kms_master_key_id = var.sns_topic_kms_key_id | ||
tags = var.extra_tags | ||
} | ||
|
||
resource "aws_sns_topic_subscription" "lifecycle" { | ||
topic_arn = aws_sns_topic.lifecycle.arn | ||
protocol = "lambda" | ||
endpoint = module.lambda.this_lambda_function_arn | ||
} | ||
|
||
module "lambda" { | ||
source = "terraform-aws-modules/lambda/aws" | ||
version = "1.28.0" | ||
|
||
function_name = "${var.name}-lifecycle" | ||
|
||
handler = var.lambda_handler | ||
source_path = var.lambda_source_path | ||
runtime = var.lambda_runtime | ||
timeout = var.lambda_timeout | ||
kms_key_arn = var.kms_key_arn | ||
reserved_concurrent_executions = var.reserved_concurrent_executions | ||
|
||
# If publish is disabled, there will be "Error adding new Lambda Permission for notify_slack: InvalidParameterValueException: We currently do not support adding policies for $LATEST." | ||
publish = true | ||
|
||
environment_variables = var.lambda_environment_variables | ||
|
||
create_role = var.lambda_role == "" | ||
lambda_role = var.lambda_role | ||
role_name = "${var.name}-lifecycle" | ||
role_permissions_boundary = var.iam_role_boundary_policy_arn | ||
role_tags = var.iam_role_tags | ||
|
||
attach_network_policy = var.lambda_function_vpc_subnet_ids != null | ||
|
||
allowed_triggers = { | ||
AllowExecutionFromSNS = { | ||
principal = "sns.amazonaws.com" | ||
source_arn = aws_sns_topic.lifecycle.arn | ||
} | ||
} | ||
|
||
store_on_s3 = var.lambda_function_store_on_s3 | ||
s3_bucket = var.lambda_function_s3_bucket | ||
|
||
vpc_subnet_ids = var.lambda_function_vpc_subnet_ids | ||
vpc_security_group_ids = var.lambda_function_vpc_security_group_ids | ||
|
||
tags = var.extra_tags | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
# Launching Instance | ||
resource "aws_autoscaling_lifecycle_hook" "lifecycle_launching" { | ||
name = "${var.name}-launching" | ||
autoscaling_group_name = var.autoscaling_group_name | ||
default_result = var.default_result["launching"] | ||
heartbeat_timeout = var.heartbeat_timeout["launching"] | ||
lifecycle_transition = "autoscaling:EC2_INSTANCE_LAUNCHING" | ||
notification_metadata = var.notification_metadata["launching"] | ||
notification_target_arn = aws_sns_topic.lifecycle.arn | ||
role_arn = aws_iam_role.lifecycle.arn | ||
} | ||
|
||
# Terminating Instance | ||
resource "aws_autoscaling_lifecycle_hook" "lifecycle_terminating" { | ||
name = "${var.name}-terminating" | ||
autoscaling_group_name = var.autoscaling_group_name | ||
default_result = var.default_result["terminating"] | ||
heartbeat_timeout = var.heartbeat_timeout["terminating"] | ||
lifecycle_transition = "autoscaling:EC2_INSTANCE_TERMINATING" | ||
notification_metadata = var.notification_metadata["terminating"] | ||
notification_target_arn = aws_sns_topic.lifecycle.arn | ||
role_arn = aws_iam_role.lifecycle.arn | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
import boto3 | ||
import json | ||
import logging | ||
import os.path | ||
import time | ||
|
||
from botocore.signers import RequestSigner | ||
from kubernetes import client as k8s_client | ||
from kubernetes import config as k8s_config | ||
from kubernetes.client.rest import ApiException | ||
|
||
from k8s_utils import (abandon_lifecycle_action, cordon_node, node_exists, node_ready, remove_all_pods) | ||
|
||
logger = logging.getLogger(__name__) | ||
logger.setLevel(logging.DEBUG) | ||
|
||
KUBE_FILEPATH = '/tmp/kubeconfig' | ||
REGION = os.environ['AWS_REGION'] | ||
|
||
ec2 = boto3.client('ec2', region_name=REGION) | ||
asg = boto3.client('autoscaling', region_name=REGION) | ||
s3 = boto3.client('s3', region_name=REGION) | ||
|
||
def k8s_api(): | ||
|
||
env = { | ||
'cluster_name': os.environ.get('CLUSTER_NAME'), | ||
'kube_config_bucket': os.environ.get('KUBE_CONFIG_BUCKET'), | ||
'kube_config_object': os.environ.get('KUBE_CONFIG_OBJECT') | ||
} | ||
|
||
kube_config_bucket = env['kube_config_bucket'] | ||
cluster_name = env['cluster_name'] | ||
|
||
if not os.path.exists(KUBE_FILEPATH): | ||
if kube_config_bucket: | ||
logger.info('No kubeconfig file found. Downloading...') | ||
s3.download_file(kube_config_bucket, env['kube_config_object'], KUBE_FILEPATH) | ||
else: | ||
logger.info('No kubeconfig file found.') | ||
|
||
k8s_config.load_kube_config(KUBE_FILEPATH) | ||
|
||
return k8s_client.CoreV1Api() | ||
|
||
def launch_node(k8s_api, auto_scaling_group_name, lifecycle_hook_name, instance_id, node_name, timeout): | ||
|
||
waiting_timeout = time.time() + timeout | ||
|
||
while True: | ||
if time.time() > waiting_timeout: | ||
logger.exception('timeout waiting for node {} launch'.format(node_name)) | ||
break | ||
try: | ||
if node_exists(k8s_api, node_name): | ||
if node_ready(k8s_api, node_name): | ||
logger.info('K8s node {} is ready'.format(node_name)) | ||
break | ||
|
||
time.sleep(10) | ||
except ApiException: | ||
logger.exception('There was an error waiting the node {} ready'.format(node_name)) | ||
abandon_lifecycle_action(asg, auto_scaling_group_name, lifecycle_hook_name, instance_id) | ||
break | ||
|
||
def terminate_node(k8s_api, auto_scaling_group_name, lifecycle_hook_name, instance_id, node_name, timeout): | ||
|
||
try: | ||
if not node_exists(k8s_api, node_name): | ||
logger.error('Node not found.') | ||
abandon_lifecycle_action(asg, auto_scaling_group_name, lifecycle_hook_name, instance_id) | ||
return | ||
|
||
cordon_node(k8s_api, node_name) | ||
|
||
remove_all_pods(k8s_api, node_name) | ||
|
||
asg.complete_lifecycle_action(LifecycleHookName=lifecycle_hook_name, | ||
AutoScalingGroupName=auto_scaling_group_name, | ||
LifecycleActionResult='CONTINUE', | ||
InstanceId=instance_id) | ||
except ApiException: | ||
logger.exception('There was an error removing the pods from the node {}'.format(node_name)) | ||
abandon_lifecycle_action(asg, auto_scaling_group_name, lifecycle_hook_name, instance_id) | ||
|
||
def lambda_handler(event, context): | ||
|
||
k8s_api_client = k8s_api() | ||
|
||
logger.info(event) | ||
|
||
# process asg lifecycle hooks | ||
for record in event['Records']: | ||
|
||
lifecycle_hook_name = '' | ||
lfiecycle_transition = '' | ||
auto_scaling_group_name = '' | ||
instance_id = '' | ||
node_name = '' | ||
instance_lifecycle = '' | ||
|
||
hook_payload = json.loads(record['Sns']['Message']) | ||
|
||
# initial variable from hook payload | ||
if 'LifecycleTransition' not in hook_payload: | ||
continue | ||
else: | ||
lifecycle_hook_name = hook_payload['LifecycleHookName'] | ||
auto_scaling_group_name = hook_payload['AutoScalingGroupName'] | ||
lfiecycle_transition = hook_payload['LifecycleTransition'] | ||
instance_id = hook_payload['EC2InstanceId'] | ||
instance = ec2.describe_instances(InstanceIds=[instance_id])['Reservations'][0]['Instances'][0] | ||
node_name = instance['PrivateDnsName'] | ||
instance_lifecycle = instance['InstanceLifecycle'] | ||
|
||
|
||
logger.info("Processing %s event from auto scaling group %s, and the instance id is %s, private dns name is %s" % (lfiecycle_transition, auto_scaling_group_name, instance_id, node_name)) | ||
|
||
# execute specific action for lifecycle hook | ||
if lfiecycle_transition == 'autoscaling:EC2_INSTANCE_LAUNCHING': | ||
timeout = float(os.environ.get('LAUNCHING_TIMEOUT')) | ||
launch_node(k8s_api_client, auto_scaling_group_name, lifecycle_hook_name, instance_id, node_name, timeout) | ||
|
||
elif lfiecycle_transition == 'autoscaling:EC2_INSTANCE_TERMINATING': | ||
timeout = float(os.environ.get('TERMINATING_TIMEOUT')) | ||
terminate_node(k8s_api_client, auto_scaling_group_name, lifecycle_hook_name, instance_id, node_name, timeout) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
import logging | ||
import time | ||
|
||
from kubernetes.client.rest import ApiException | ||
|
||
logger = logging.getLogger(__name__) | ||
logger.setLevel(logging.DEBUG) | ||
|
||
MIRROR_POD_ANNOTATION_KEY = "kubernetes.io/config.mirror" | ||
CONTROLLER_KIND_DAEMON_SET = "DaemonSet" | ||
|
||
|
||
def cordon_node(api, node_name): | ||
"""Marks the specified node as unschedulable, which means that no new pods can be launched on the | ||
node by the Kubernetes scheduler. | ||
""" | ||
patch_body = { | ||
'apiVersion': 'v1', | ||
'kind': 'Node', | ||
'metadata': { | ||
'name': node_name | ||
}, | ||
'spec': { | ||
'unschedulable': True | ||
} | ||
} | ||
|
||
api.patch_node(node_name, patch_body) | ||
|
||
|
||
def remove_all_pods(api, node_name, poll=5): | ||
"""Removes all Kubernetes pods from the specified node.""" | ||
pods = get_evictable_pods(api, node_name) | ||
|
||
logger.debug('Number of pods to delete: ' + str(len(pods))) | ||
|
||
evict_until_completed(api, pods, poll) | ||
wait_until_empty(api, node_name, poll) | ||
|
||
|
||
def pod_is_evictable(pod): | ||
if pod.metadata.annotations is not None and pod.metadata.annotations.get(MIRROR_POD_ANNOTATION_KEY): | ||
logger.info("Skipping mirror pod {}/{}".format(pod.metadata.namespace, pod.metadata.name)) | ||
return False | ||
if pod.metadata.owner_references is None: | ||
return True | ||
for ref in pod.metadata.owner_references: | ||
if ref.controller is not None and ref.controller: | ||
if ref.kind == CONTROLLER_KIND_DAEMON_SET: | ||
logger.info("Skipping DaemonSet {}/{}".format(pod.metadata.namespace, pod.metadata.name)) | ||
return False | ||
return True | ||
|
||
|
||
def get_evictable_pods(api, node_name): | ||
field_selector = 'spec.nodeName=' + node_name | ||
pods = api.list_pod_for_all_namespaces(watch=False, field_selector=field_selector) | ||
return [pod for pod in pods.items if pod_is_evictable(pod)] | ||
|
||
|
||
def evict_until_completed(api, pods, poll): | ||
pending = pods | ||
while True: | ||
pending = evict_pods(api, pending) | ||
if (len(pending)) <= 0: | ||
return | ||
time.sleep(poll) | ||
|
||
|
||
def evict_pods(api, pods): | ||
remaining = [] | ||
for pod in pods: | ||
logger.info('Evicting pod {} in namespace {}'.format(pod.metadata.name, pod.metadata.namespace)) | ||
body = { | ||
'apiVersion': 'policy/v1beta1', | ||
'kind': 'Eviction', | ||
'deleteOptions': {}, | ||
'metadata': { | ||
'name': pod.metadata.name, | ||
'namespace': pod.metadata.namespace | ||
} | ||
} | ||
try: | ||
api.create_namespaced_pod_eviction(pod.metadata.name, pod.metadata.namespace, body) | ||
except ApiException as err: | ||
if err.status == 429: | ||
remaining.append(pod) | ||
logger.warning("Pod {}/{} could not be evicted due to disruption budget. Will retry.".format(pod.metadata.namespace, pod.metadata.name)) | ||
else: | ||
logger.exception("Unexpected error adding eviction for pod {}/{}".format(pod.metadata.namespace, pod.metadata.name)) | ||
except: | ||
logger.exception("Unexpected error adding eviction for pod {}/{}".format(pod.metadata.namespace, pod.metadata.name)) | ||
return remaining | ||
|
||
|
||
def wait_until_empty(api, node_name, poll): | ||
logger.info("Waiting for evictions to complete") | ||
while True: | ||
pods = get_evictable_pods(api, node_name) | ||
if len(pods) <= 0: | ||
logger.info("All pods evicted successfully") | ||
return | ||
logger.debug("Still waiting for deletion of the following pods: {}".format(", ".join(map(lambda pod: pod.metadata.namespace + "/" + pod.metadata.name, pods)))) | ||
time.sleep(poll) | ||
|
||
|
||
def node_ready(api, node_name): | ||
"""Determines whether the specified node is ready.""" | ||
field_selector = 'metadata.name=' + node_name | ||
node = api.list_node(pretty=True, field_selector=field_selector).items[0] | ||
|
||
for condition in node.status.conditions: | ||
if condition.type == 'Ready': | ||
return condition.status | ||
|
||
def node_exists(api, node_name): | ||
"""Determines whether the specified node is still part of the cluster.""" | ||
nodes = api.list_node(pretty=True).items | ||
node = next((n for n in nodes if n.metadata.name == node_name), None) | ||
return False if not node else True | ||
|
||
|
||
def abandon_lifecycle_action(asg_client, auto_scaling_group_name, lifecycle_hook_name, instance_id): | ||
"""Completes the lifecycle action with the ABANDON result, which stops any remaining actions, | ||
such as other lifecycle hooks. | ||
""" | ||
asg_client.complete_lifecycle_action(LifecycleHookName=lifecycle_hook_name, | ||
AutoScalingGroupName=auto_scaling_group_name, | ||
LifecycleActionResult='ABANDON', | ||
InstanceId=instance_id) | ||
|
||
# to-do: wait for node ready | ||
|
||
# to-do: add node label for Kubernetes version >= 1.19 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
# | ||
# These requirements were autogenerated by pipenv | ||
# To regenerate from the project's Pipfile, run: | ||
# | ||
# pipenv lock --requirements | ||
# | ||
|
||
-i https://pypi.org/simple | ||
boto3==1.16.63 | ||
botocore==1.19.63 | ||
cachetools==4.2.1; python_version ~= '3.5' | ||
certifi==2020.12.5 | ||
chardet==4.0.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' | ||
google-auth==1.24.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5' | ||
idna==2.10; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' | ||
jmespath==0.10.0; python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3' | ||
kubernetes==12.0.1 | ||
oauthlib==3.1.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' | ||
pyasn1-modules==0.2.8 | ||
pyasn1==0.4.8 | ||
python-dateutil==2.8.1; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' | ||
pyyaml==5.4.1 | ||
requests-oauthlib==1.3.0 | ||
requests==2.25.1; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' | ||
rsa==4.7; python_version >= '3.6' | ||
s3transfer==0.3.4 | ||
six==1.15.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' | ||
urllib3==1.26.3; python_version != '3.4' | ||
websocket-client==0.57.0 |
Oops, something went wrong.