Skip to content

Commit

Permalink
Merge pull request #1 from getamis/feature/support_k8s_master_role
Browse files Browse the repository at this point in the history
support k8s master role
  • Loading branch information
smalltown authored Feb 16, 2021
2 parents cc77672 + 5934c03 commit 9db3f39
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 87 deletions.
128 changes: 60 additions & 68 deletions modules/kubernetes/functions/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from kubernetes import config as k8s_config
from kubernetes.client.rest import ApiException

from k8s_utils import (abandon_lifecycle_action, cordon_node, node_exists, node_ready, remove_all_pods)
from k8s_utils import (abandon_lifecycle_action, continue_lifecycle_action, cordon_node, node_exists, node_ready, append_node_labels, master_ready, remove_all_pods)

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
Expand All @@ -19,108 +19,100 @@

ec2 = boto3.client('ec2', region_name=REGION)
asg = boto3.client('autoscaling', region_name=REGION)
s3 = boto3.client('s3', region_name=REGION)
elb = boto3.client('elb', region_name=REGION)
s3 = boto3.client('s3', region_name=REGION)

def k8s_api():
def hook_init(hook_payload):

env = {
hook_info = {
'cluster_name': os.environ.get('CLUSTER_NAME'),
'kube_config_bucket': os.environ.get('KUBE_CONFIG_BUCKET'),
'kube_config_object': os.environ.get('KUBE_CONFIG_OBJECT')
'kube_config_object': os.environ.get('KUBE_CONFIG_OBJECT'),
'node_role': os.environ.get('KUBERNETES_NODE_ROLE'),
'launching_timeout': float(os.environ.get('LAUNCHING_TIMEOUT')),
'terminating_timeout': float(os.environ.get('TERMINATING_TIMEOUT'))
}

kube_config_bucket = env['kube_config_bucket']
cluster_name = env['cluster_name']
hook_info['name'] = hook_payload['LifecycleHookName']
hook_info['asg_name'] = hook_payload['AutoScalingGroupName']
hook_info['transition'] = hook_payload['LifecycleTransition']
hook_info['instance_id'] = hook_payload['EC2InstanceId']

instance = ec2.describe_instances(InstanceIds=[hook_info['instance_id']])['Reservations'][0]['Instances'][0]

hook_info['node_name'] = instance['PrivateDnsName']
hook_info['instance_lifecycle'] = 'Ec2Spot' if 'InstanceLifecycle' in instance else 'OnDemand'

logger.info("Processing %s event from auto scaling group %s, and the instance id is %s, private dns name is %s" % (hook_info['transition'], hook_info['asg_name'], hook_info['instance_id'], hook_info['node_name']))

if not os.path.exists(KUBE_FILEPATH):
if kube_config_bucket:
if hook_info['kube_config_bucket']:
logger.info('No kubeconfig file found. Downloading...')
s3.download_file(kube_config_bucket, env['kube_config_object'], KUBE_FILEPATH)
s3.download_file(hook_info['kube_config_bucket'], hook_info['kube_config_object'], KUBE_FILEPATH)
else:
logger.info('No kubeconfig file found.')

k8s_config.load_kube_config(KUBE_FILEPATH)

return k8s_client.CoreV1Api()
return k8s_client.CoreV1Api(), hook_info

def launch_node(k8s_api, auto_scaling_group_name, lifecycle_hook_name, instance_id, node_name, timeout):

waiting_timeout = time.time() + timeout

while True:
if time.time() > waiting_timeout:
logger.exception('timeout waiting for node {} launch'.format(node_name))
break
try:
if node_exists(k8s_api, node_name):
if node_ready(k8s_api, node_name):
logger.info('K8s node {} is ready'.format(node_name))
break

time.sleep(10)
except ApiException:
logger.exception('There was an error waiting the node {} ready'.format(node_name))
abandon_lifecycle_action(asg, auto_scaling_group_name, lifecycle_hook_name, instance_id)
break
def launch_node(k8s_api, hook_info):

def terminate_node(k8s_api, auto_scaling_group_name, lifecycle_hook_name, instance_id, node_name, timeout):
asg_desired_capacity = asg.describe_auto_scaling_groups(
AutoScalingGroupNames=[hook_info['asg_name']]
)['AutoScalingGroups'][0]['DesiredCapacity']

if 'master' in hook_info['node_role'] and asg_desired_capacity == 1:
continue_lifecycle_action(asg, hook_info['asg_name'], hook_info['name'], hook_info['instance_id'])

else:
if node_ready(k8s_api, hook_info['node_name'], hook_info['launching_timeout']):
append_node_labels(k8s_api, hook_info['node_name'], hook_info['node_role'], hook_info['instance_lifecycle'])
continue_lifecycle_action(asg, hook_info['asg_name'], hook_info['name'], hook_info['instance_id'])
else:
abandon_lifecycle_action(asg, hook_info['asg_name'], hook_info['name'], hook_info['instance_id'])

def terminate_node(k8s_api, hook_info):

try:
if not node_exists(k8s_api, node_name):
logger.error('Node not found.')
abandon_lifecycle_action(asg, auto_scaling_group_name, lifecycle_hook_name, instance_id)
if not master_ready(k8s_api, asg, elb, ec2, hook_info['asg_name'], hook_info['node_name'], hook_info['node_role'], hook_info['launching_timeout']):
logger.error('There is no master node.')
abandon_lifecycle_action(asg, hook_info['asg_name'], hook_info['name'], hook_info['instance_id'])
return

cordon_node(k8s_api, node_name)
if not node_exists(k8s_api, hook_info['node_name']):
logger.error('Node not found.')
abandon_lifecycle_action(asg, hook_info['asg_name'], hook_info['name'], hook_info['instance_id'])
return

remove_all_pods(k8s_api, node_name)
cordon_node(k8s_api, hook_info['node_name'])
remove_all_pods(k8s_api, hook_info['node_name'])

asg.complete_lifecycle_action(LifecycleHookName=lifecycle_hook_name,
AutoScalingGroupName=auto_scaling_group_name,
LifecycleActionResult='CONTINUE',
InstanceId=instance_id)
except ApiException:
logger.exception('There was an error removing the pods from the node {}'.format(node_name))
abandon_lifecycle_action(asg, auto_scaling_group_name, lifecycle_hook_name, instance_id)
continue_lifecycle_action(asg, hook_info['asg_name'], hook_info['name'], hook_info['instance_id'])

except:
logger.exception('There was an error removing the pods from the node {}'.format(hook_info['node_name']))
abandon_lifecycle_action(asg, hook_info['asg_name'], hook_info['name'], hook_info['instance_id'])

def lambda_handler(event, context):

k8s_api_client = k8s_api()

logger.info(event)

# process asg lifecycle hooks
for record in event['Records']:

lifecycle_hook_name = ''
lfiecycle_transition = ''
auto_scaling_group_name = ''
instance_id = ''
node_name = ''
instance_lifecycle = ''

k8s_api_client = None
hook_info = {}
hook_payload = json.loads(record['Sns']['Message'])

# initial variable from hook payload
if 'LifecycleTransition' not in hook_payload:
continue
else:
lifecycle_hook_name = hook_payload['LifecycleHookName']
auto_scaling_group_name = hook_payload['AutoScalingGroupName']
lfiecycle_transition = hook_payload['LifecycleTransition']
instance_id = hook_payload['EC2InstanceId']
instance = ec2.describe_instances(InstanceIds=[instance_id])['Reservations'][0]['Instances'][0]
node_name = instance['PrivateDnsName']
instance_lifecycle = instance['InstanceLifecycle']


logger.info("Processing %s event from auto scaling group %s, and the instance id is %s, private dns name is %s" % (lfiecycle_transition, auto_scaling_group_name, instance_id, node_name))
k8s_api_client, hook_info = hook_init(hook_payload)

# execute specific action for lifecycle hook
if lfiecycle_transition == 'autoscaling:EC2_INSTANCE_LAUNCHING':
timeout = float(os.environ.get('LAUNCHING_TIMEOUT'))
launch_node(k8s_api_client, auto_scaling_group_name, lifecycle_hook_name, instance_id, node_name, timeout)
if hook_info['transition'] == 'autoscaling:EC2_INSTANCE_LAUNCHING':
launch_node(k8s_api_client, hook_info)

elif lfiecycle_transition == 'autoscaling:EC2_INSTANCE_TERMINATING':
timeout = float(os.environ.get('TERMINATING_TIMEOUT'))
terminate_node(k8s_api_client, auto_scaling_group_name, lifecycle_hook_name, instance_id, node_name, timeout)
elif hook_info['transition'] == 'autoscaling:EC2_INSTANCE_TERMINATING':
terminate_node(k8s_api_client, hook_info)
129 changes: 115 additions & 14 deletions modules/kubernetes/functions/k8s_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
from kubernetes.client.rest import ApiException

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.setLevel(logging.INFO)

MIRROR_POD_ANNOTATION_KEY = "kubernetes.io/config.mirror"
CONTROLLER_KIND_DAEMON_SET = "DaemonSet"


def cordon_node(api, node_name):
"""Marks the specified node as unschedulable, which means that no new pods can be launched on the
node by the Kubernetes scheduler.
Expand Down Expand Up @@ -103,22 +102,119 @@ def wait_until_empty(api, node_name, poll):
logger.debug("Still waiting for deletion of the following pods: {}".format(", ".join(map(lambda pod: pod.metadata.namespace + "/" + pod.metadata.name, pods))))
time.sleep(poll)

def master_ready(api, asg_client, lb_client, ec2_client, asg_name, node_name, node_role, timeout):
"""Determines whether the K8s master node are ready"""

# The asg instance refresh operaiton is not for master role
if 'master' not in node_role:
return True

asg_info = asg_client.describe_auto_scaling_groups(
AutoScalingGroupNames=[asg_name]
)['AutoScalingGroups'][0]
asg_instances = asg_info['Instances']
asg_desired_capacity = asg_info['DesiredCapacity']
asg_remain_instances = [instance['InstanceId'] for instance in asg_instances if 'Terminating' not in instance['LifecycleState']]

# The master asg contain multiple nodes
if asg_desired_capacity > 1:
return True

# There is only one node in the master asg, waiting for the node bind to lb
waiting_timeout = time.time() + timeout

lb_name = asg_client.describe_load_balancers(AutoScalingGroupName=asg_name)['LoadBalancers'][0]['LoadBalancerName']

while True:
if time.time() > waiting_timeout:
logger.exception('timeout waiting for master node {} ready'.format(node_name))
return False

try:
lb_instances = lb_client.describe_load_balancers(LoadBalancerNames = [lb_name])['LoadBalancerDescriptions'][0]['Instances']

for target_instance in lb_instances:
if target_instance['InstanceId'] in asg_remain_instances:

target_instance_state = lb_client.describe_instance_health(
LoadBalancerName=lb_name,
Instances=[
{
'InstanceId': target_instance['InstanceId']
},
]
)['InstanceStates'][0]['State']

if target_instance_state == 'InService':
master_instance = ec2_client.describe_instances(InstanceIds=[target_instance['InstanceId']])['Reservations'][0]['Instances'][0]
node_name = master_instance['PrivateDnsName']
instance_lifecycle = 'Ec2Spot' if 'InstanceLifecycle' in master_instance else 'OnDemand'
append_node_labels(api, node_name, node_role, instance_lifecycle)

return True

def node_ready(api, node_name):
time.sleep(10)

except:
logger.exception('There was an error waiting the node {} ready'.format(node_name))
return False

def node_ready(api, node_name, timeout):
"""Determines whether the specified node is ready."""

waiting_timeout = time.time() + timeout
field_selector = 'metadata.name=' + node_name
node = api.list_node(pretty=True, field_selector=field_selector).items[0]

for condition in node.status.conditions:
if condition.type == 'Ready':
return condition.status

while True:
if time.time() > waiting_timeout:
logger.exception('timeout waiting for node {} launch'.format(node_name))
return False

try:
# The node is still not been registered to K8s
if not node_exists(api, node_name):
time.sleep(10)
continue

node = api.list_node(pretty=True, field_selector=field_selector).items[0]

for condition in node.status.conditions:
if condition.type == 'Ready' and condition.status == 'True':
return True

time.sleep(10)
except:
logger.exception('There was an error waiting the node {} ready'.format(node_name))
return False

def node_exists(api, node_name):
"""Determines whether the specified node is still part of the cluster."""
nodes = api.list_node(pretty=True).items
node = next((n for n in nodes if n.metadata.name == node_name), None)
return False if not node else True

try:
nodes = api.list_node(pretty=True).items
node = next((n for n in nodes if n.metadata.name == node_name), None)

return False if not node else True
except:
return False

def append_node_labels(api, node_name, node_role, instance_lifecycle):

node_role_label = "node-role.kubernetes.io/%s" % (node_role)

patch_body = {
"metadata": {
"labels": {
"lifecycle": instance_lifecycle,
node_role_label: ""
}
}
}

try:
api.patch_node(node_name, patch_body)
except:
logger.exception('There was an error appending labels to the node {} '.format(node_name))

def abandon_lifecycle_action(asg_client, auto_scaling_group_name, lifecycle_hook_name, instance_id):
"""Completes the lifecycle action with the ABANDON result, which stops any remaining actions,
Expand All @@ -129,6 +225,11 @@ def abandon_lifecycle_action(asg_client, auto_scaling_group_name, lifecycle_hook
LifecycleActionResult='ABANDON',
InstanceId=instance_id)

# to-do: wait for node ready

# to-do: add node label for Kubernetes version >= 1.19
def continue_lifecycle_action(asg_client, auto_scaling_group_name, lifecycle_hook_name, instance_id):
"""Completes the lifecycle action with the CONTINUE result, which continues the remaining actions,
such as other lifecycle hooks.
"""
asg_client.complete_lifecycle_action(LifecycleHookName=lifecycle_hook_name,
AutoScalingGroupName=auto_scaling_group_name,
LifecycleActionResult='CONTINUE',
InstanceId=instance_id)
22 changes: 17 additions & 5 deletions modules/kubernetes/main.tf
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
locals {
lambda_environment_variables = {
CLUSTER_NAME = var.cluster_name
KUBE_CONFIG_BUCKET = var.kubeconfig_s3_bucket
KUBE_CONFIG_OBJECT = var.kubeconfig_s3_object
LAUNCHING_TIMEOUT = var.heartbeat_timeout["launching"]
TERMINATING_TIMEOUT = var.heartbeat_timeout["terminating"]
CLUSTER_NAME = var.cluster_name
KUBE_CONFIG_BUCKET = var.kubeconfig_s3_bucket
KUBE_CONFIG_OBJECT = var.kubeconfig_s3_object
KUBERNETES_NODE_ROLE = var.kubernetes_node_role
LAUNCHING_TIMEOUT = var.heartbeat_timeout["launching"]
TERMINATING_TIMEOUT = var.heartbeat_timeout["terminating"]
}
}

Expand Down Expand Up @@ -55,6 +56,7 @@ data "aws_iam_policy_document" "k8s_lifecycle" {
actions = [
"autoscaling:DescribeTags",
"autoscaling:DescribeAutoScalingGroups",
"autoscaling:DescribeLoadBalancers",
"autoscaling:CompleteLifecycleAction",
"ec2:DescribeInstances",
"ec2:CreateTags"
Expand All @@ -63,6 +65,16 @@ data "aws_iam_policy_document" "k8s_lifecycle" {
"*"
]
}
statement {
sid = "ELB"
actions = [
"elasticloadbalancing:DescribeLoadBalancers",
"elasticloadbalancing:DescribeInstanceHealth"
]
resources = [
"*"
]
}
statement {
sid = "S3"
actions = [
Expand Down
5 changes: 5 additions & 0 deletions modules/kubernetes/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ variable "kubeconfig_s3_object" {
type = string
}

variable "kubernetes_node_role" {
description = "The kubernetes node role name, e.g. node-role.kubernetes.io/master"
type = string
}

variable "lambda_handler" {
description = "The lifecycle hooks lambda handler"
type = string
Expand Down

0 comments on commit 9db3f39

Please sign in to comment.