Skip to content

Commit

Permalink
WIP: Add WaitForCullToComplete task
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyley committed Jun 4, 2024
1 parent 753e0e2 commit 20fa5f2
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ object AutoScaling extends DeploymentType with BucketParameters {
terminationGrace(pkg, target, reporter)
),
ResumeAlarmNotifications(autoScalingGroup, target.region),
WaitForCullToComplete(
autoScalingGroup,
secondsToWait(pkg, target, reporter),
target.region
),
WaitForStabilization(
autoScalingGroup,
secondsToWait(pkg, target, reporter),
Expand Down
182 changes: 84 additions & 98 deletions magenta-lib/src/main/scala/magenta/tasks/ASGTasks.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package magenta.tasks

import magenta.deployment_type.AutoScalingGroupInfo
import magenta.tasks.CullInstancesWithTerminationTag.TerminatingStates
import magenta.tasks.EC2.withEc2Client
import magenta.tasks.autoscaling.CullSummary
import magenta.{KeyRing, _}
import software.amazon.awssdk.awscore.exception.AwsServiceException
import software.amazon.awssdk.services.autoscaling.AutoScalingClient
import software.amazon.awssdk.services.autoscaling.model.{AutoScalingGroup, Instance, LifecycleState, SetInstanceProtectionRequest}
import software.amazon.awssdk.services.autoscaling.model.{
AutoScalingGroup,
Instance,
LifecycleState,
SetInstanceProtectionRequest
}
import software.amazon.awssdk.services.ec2.Ec2Client

import java.time.Duration
Expand All @@ -17,9 +23,8 @@ case class CheckGroupSize(info: AutoScalingGroupInfo, region: Region)(implicit
override def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit = {
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit = {
val doubleCapacity = asg.desiredCapacity * 2
resources.reporter.verbose(
s"ASG desired = ${asg.desiredCapacity}; ASG max = ${asg.maxSize}; Target = $doubleCapacity"
Expand All @@ -43,11 +48,10 @@ case class TagCurrentInstancesWithTerminationTag(
override def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit = {
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit = {
if (asg.instances.asScala.nonEmpty) {
EC2.withEc2Client(keyRing, region, resources) { ec2Client =>
withEc2Client(keyRing, region, resources) { ec2Client =>
resources.reporter.verbose(
s"Tagging ${asg.instances.asScala.toList.map(_.instanceId).mkString(", ")}"
)
Expand All @@ -73,9 +77,8 @@ case class ProtectCurrentInstances(info: AutoScalingGroupInfo, region: Region)(
override def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit = {
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit = {
val instances = asg.instances.asScala.toList
val instancesInService =
instances.filter(_.lifecycleState == LifecycleState.IN_SERVICE)
Expand Down Expand Up @@ -104,9 +107,8 @@ case class DoubleSize(info: AutoScalingGroupInfo, region: Region)(implicit
override def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit = {
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit = {
val targetCapacity = asg.desiredCapacity * 2
resources.reporter.verbose(s"Doubling capacity to $targetCapacity")
ASG.desiredCapacity(asg.autoScalingGroupName, targetCapacity, asgClient)
Expand All @@ -125,9 +127,8 @@ sealed abstract class Pause(duration: Duration)(implicit val keyRing: KeyRing)
def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit = {
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit = {
if (asg.desiredCapacity == 0 && asg.instances.isEmpty)
resources.reporter.verbose(
"Skipping pause as there are no instances and desired capacity is zero"
Expand Down Expand Up @@ -175,9 +176,8 @@ case class WaitForStabilization(
override def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit = {
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit = {
ELB.withClient(keyRing, region, resources) { elbClient =>
check(resources.reporter, stopFlag) {
try {
Expand All @@ -188,10 +188,9 @@ case class WaitForStabilization(
case Right(()) => true
}
} catch {
case e: AwsServiceException if isRateExceeded(e) => {
case e: AwsServiceException if isRateExceeded(e) =>
resources.reporter.info(e.getMessage)
false
}
}
}
}
Expand All @@ -205,16 +204,6 @@ case class WaitForStabilization(
s"Check the desired number of hosts in both the ASG ($asgName) and ELB are up and that the number of hosts match"
}

object CullInstancesWithTerminationTag {
// See https://docs.aws.amazon.com/autoscaling/ec2/userguide/lifecycle-hooks.html#lifecycle-hooks-overview
val TerminatingStates: Set[LifecycleState] = Set(
LifecycleState.TERMINATING,
LifecycleState.TERMINATING_WAIT,
LifecycleState.TERMINATING_PROCEED,
LifecycleState.TERMINATED
)
}

case class CullInstancesWithTerminationTag(
info: AutoScalingGroupInfo,
region: Region
Expand All @@ -223,49 +212,14 @@ case class CullInstancesWithTerminationTag(
override def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit = {
EC2.withEc2Client(keyRing, region, resources) { ec2Client =>
ELB.withClient(keyRing, region, resources) { elbClient =>
val allInstances = asg.instances.asScala.toSeq
resources.reporter.verbose(
s"Found the following instances: ${allInstances.map(_.instanceId).mkString(", ")}"
)
val instancesToKill = allInstances
.filter(instance => {
if (
instance.lifecycleState == LifecycleState.UNKNOWN_TO_SDK_VERSION
) {
logger.warn(
s"Instance lifecycle state ${instance.lifecycleStateAsString} isn't recognised in the AWS SDK. Is there a later version of the AWS SDK available?"
)
}

val isAlreadyTerminating =
TerminatingStates.contains(instance.lifecycleState)
val isTaggedForTermination =
EC2.hasTag(instance, "Magenta", "Terminate", ec2Client)

isTaggedForTermination && !isAlreadyTerminating
})

reportOnRetainedInstances(
ec2Client,
resources,
allInstances,
instancesToKill
)

val orderedInstancesToKill =
instancesToKill.transposeBy(_.availabilityZone)
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit = {
withEc2Client(keyRing, region, resources) { implicit ec2Client =>
val instancesToKill =
prepareOrderedInstancesToKill(asg, resources.reporter)
ELB.withClient(keyRing, region, resources) { implicit elbClient =>
try {
resources.reporter.verbose(
s"Culling instances: ${orderedInstancesToKill.map(_.instanceId).mkString(", ")}"
)
orderedInstancesToKill.foreach(instance =>
ASG.cull(asg, instance, asgClient, elbClient)
)
instancesToKill.foreach(instance => ASG.cull(asg, instance))
} catch {
case e: AwsServiceException if desiredSizeReset(e) =>
resources.reporter.warning(
Expand All @@ -285,19 +239,33 @@ case class CullInstancesWithTerminationTag(
.contains("ValidationError")
}

private def prepareOrderedInstancesToKill(
asg: AutoScalingGroup,
reporter: DeployReporter
)(implicit ec2Client: Ec2Client): Seq[Instance] = {
val cullSummary = CullSummary.forAsg(asg, reporter)
reportOnRetainedInstances(reporter, cullSummary)
val orderedInstancesToKill =
cullSummary.instancesToKill.toSeq.transposeBy(_.availabilityZone)
reporter.verbose(
s"Culling instances: ${orderedInstancesToKill.map(_.instanceId).mkString(", ")}"
)
orderedInstancesToKill
}

private def reportOnRetainedInstances(
ec2Client: Ec2Client,
resources: DeploymentResources,
allInstances: Seq[Instance],
instancesToKill: Seq[Instance]
): Unit = {
val instancesToRetain = allInstances.diff(instancesToKill).toList
resources.reporter.verbose(
reporter: DeployReporter,
cullSummary: CullSummary
)(implicit ec2Client: Ec2Client): Unit = {
val instancesToKill = cullSummary.instancesToKill
val instancesToRetain =
cullSummary.allInstances.diff(instancesToKill).toList
reporter.verbose(
s"Decided to keep the following instances: ${instancesToRetain.map(_.instanceId).mkString(", ")}"
)

if (instancesToRetain.size != instancesToKill.size) {
resources.reporter.warning(
reporter.warning(
s"Terminating ${instancesToKill.size} instances and retaining ${instancesToRetain.size} instances"
)
logger.warn(
Expand All @@ -306,18 +274,40 @@ case class CullInstancesWithTerminationTag(
instancesToRetain.foreach(instanceToRetain => {
val tags = EC2
.allTags(instanceToRetain, ec2Client)
.toList
.map(tag => s"${tag.key}:${tag.value}")
.mkString(", ")
resources.reporter.verbose(
reporter.verbose(
s"Will not terminate $instanceToRetain. State: ${instanceToRetain.lifecycleStateAsString}. Tags: $tags"
)
})
}
}

lazy val description =
s"Terminate instances in $asgName with the termination tag for this deploy"
s"Request termination for instances in $asgName with the termination tag for this deploy"
}

case class WaitForCullToComplete(
info: AutoScalingGroupInfo,
duration: Duration,
region: Region
)(implicit val keyRing: KeyRing)
extends ASGTask
with SlowRepeatedPollingCheck {

override def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit =
withEc2Client(keyRing, region, resources) { implicit ec2Client =>
check(resources.reporter, stopFlag) {
CullSummary.forAsg(asg, resources.reporter).isCullComplete
}
}

lazy val description: String =
s"Check that all instances tagged for termination in $asgName have been terminated"
}

case class SuspendAlarmNotifications(
Expand All @@ -329,9 +319,8 @@ case class SuspendAlarmNotifications(
override def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit = {
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit = {
ASG.suspendAlarmNotifications(asg.autoScalingGroupName, asgClient)
}

Expand All @@ -346,11 +335,9 @@ case class ResumeAlarmNotifications(info: AutoScalingGroupInfo, region: Region)(
override def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit = {
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit =
ASG.resumeAlarmNotifications(asg.autoScalingGroupName, asgClient)
}

lazy val description =
s"Resuming Alarm Notifications - $asgName will scale on any configured alarms"
Expand All @@ -367,15 +354,14 @@ trait ASGTask extends Task {
def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit

override def execute(
resources: DeploymentResources,
stopFlag: => Boolean
): Unit = {
ASG.withAsgClient(keyRing, region, resources) { asgClient =>
ASG.withAsgClient(keyRing, region, resources) { implicit asgClient =>
resources.reporter.verbose(
s"Looked up group matching tags ${info.tagRequirements}; identified ${info.asg.autoScalingGroupARN}"
)
Expand All @@ -384,7 +370,7 @@ trait ASGTask extends Task {
// For example, we need to make sure that we have the latest desired capacity when doubling the size of the ASG.
val latestAsgState =
ASG.getGroupByName(asgName, asgClient, resources.reporter)
execute(latestAsgState, resources, stopFlag, asgClient)
execute(latestAsgState, resources, stopFlag)
}
}
}
7 changes: 4 additions & 3 deletions magenta-lib/src/main/scala/magenta/tasks/AWS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,10 @@ object ASG {

def cull(
asg: AutoScalingGroup,
instance: ASGInstance,
asgClient: AutoScalingClient,
elbClient: ELB.Client
instance: ASGInstance
)(implicit
elbClient: ELB.Client,
asgClient: AutoScalingClient
): TerminateInstanceInAutoScalingGroupResponse = {
ELB.deregister(elbNames(asg), elbTargetArns(asg), instance, elbClient)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package magenta.tasks

import magenta.artifact.S3Path
import magenta.tasks.CloudFormationParameters.{ExistingParameter, InputParameter, TemplateParameter}
import magenta.tasks.CloudFormationParameters.{
ExistingParameter,
InputParameter,
TemplateParameter
}
import magenta.tasks.UpdateCloudFormationTask._
import magenta.{DeployReporter, DeploymentResources, KeyRing, Region}
import software.amazon.awssdk.services.cloudformation.CloudFormationClient
Expand Down
Loading

0 comments on commit 20fa5f2

Please sign in to comment.