Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix race condition #52

Merged
merged 13 commits into from
Oct 16, 2022
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/cenkalti/backoff/v4 v4.1.0
github.com/go-logr/logr v0.4.0
github.com/google/uuid v1.1.5
github.com/hashicorp/go-version v1.4.0
github.com/logrusorgru/aurora v2.0.3+incompatible
github.com/mattn/go-isatty v0.0.12
github.com/operator-framework/operator-lib v0.9.0
Expand All @@ -24,7 +25,6 @@ require (
k8s.io/client-go v0.22.6
k8s.io/klog v1.0.0
sigs.k8s.io/controller-runtime v0.10.0
github.com/hashicorp/go-version v1.4.0
)

require (
Expand Down
928 changes: 928 additions & 0 deletions go.sum

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pkg/cloudprovider/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
validationErrorCode = "ValidationError"
alreadyAttachedMessage = "is already part of AutoScalingGroup"
alreadyDetachingMessage = "is not in InService or Standby"
notInCorrectState = "is not in correct state"
)

var providerIDRegex = regexp.MustCompile(`aws:\/\/\/[\w-]+\/([\w-]+)`)
Expand All @@ -45,6 +46,9 @@ func verifyIfErrorOccured(apiErr error, expectedMessage string) (bool, error) {
if awsErr.Code() == validationErrorCode && strings.Contains(awsErr.Message(), expectedMessage) {
return true, apiErr
}
if awsErr.Code() == validationErrorCode && strings.Contains(awsErr.Message(), notInCorrectState) {
return true, apiErr
}
}

return false, apiErr
Expand Down
92 changes: 49 additions & 43 deletions pkg/controller/cyclenoderequest/transitioner/transitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ func (t *CycleNodeRequestTransitioner) transitionUndefined() (reconcile.Result,

// transitionPending transitions any CycleNodeRequests in the pending phase to the initialised phase
// Does the following:
// 1. fetches the current nodes by the label selector, and saves them as nodes to be terminated
// 2. describes the node group and checks that the number of instances in the node group matches the number we
// are planning on terminating
// 1. fetches the current nodes by the label selector, and saves them as nodes to be terminated
// 2. describes the node group and checks that the number of instances in the node group matches the number we
// are planning on terminating
func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, error) {
// Fetch the node names for the cycleNodeRequest, using the label selector provided
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Selecting nodes with label selector")
Expand Down Expand Up @@ -303,8 +303,8 @@ func (t *CycleNodeRequestTransitioner) transitionScalingUp() (reconcile.Result,

// Check we have waited long enough - give the node some time to start up
if time.Since(scaleUpStarted.Time) <= scaleUpWait {
t.rm.LogEvent(t.cycleNodeRequest, "ScalingUpWaiting", "Waiting for new nodes to be ready")
return reconcile.Result{Requeue: true, RequeueAfter: requeueDuration}, nil
t.rm.LogEvent(t.cycleNodeRequest, "ScalingUpWaiting", "Waiting for new nodes to be warmed up")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reduce the number of unnecessary requeue

return reconcile.Result{Requeue: true, RequeueAfter: scaleUpWait}, nil
}

nodeGroups, err := t.rm.CloudProvider.GetNodeGroups(t.cycleNodeRequest.GetNodeGroupNames())
Expand Down Expand Up @@ -413,51 +413,54 @@ func (t *CycleNodeRequestTransitioner) transitionCordoning() (reconcile.Result,

allNodesReadyForTermination := true
for _, node := range t.cycleNodeRequest.Status.CurrentNodes {
// If the node is not already cordoned, cordon it
cordoned, err := k8s.IsCordoned(node.Name, t.rm.RawClient)
if err != nil {
t.rm.Logger.Error(err, "failed to check if node is cordoned", "nodeName", node.Name)
return t.transitionToHealing(err)
}
if !cordoned {
if err := k8s.CordonNode(node.Name, t.rm.RawClient); err != nil {
// cordon only when k-node exists
if _, err := t.rm.GetNode(node.Name); err == nil {
hyang200 marked this conversation as resolved.
Show resolved Hide resolved
// If the node is not already cordoned, cordon it
cordoned, err := k8s.IsCordoned(node.Name, t.rm.RawClient)
if err != nil {
t.rm.Logger.Error(err, "failed to check if node is cordoned", "nodeName", node.Name)
return t.transitionToHealing(err)
}
}

// Perform pre-termination checks after the node is cordoned
// Cruicially, do this before the CNS is created for node to begin termination
if !t.cycleNodeRequest.Spec.SkipPreTerminationChecks && len(t.cycleNodeRequest.Spec.PreTerminationChecks) > 0 {
// Try to send the trigger, if is has already been sent then this will
// be skipped in the function. The trigger must only be sent once
if err := t.sendPreTerminationTrigger(node); err != nil {
return t.transitionToHealing(errors.Wrapf(err, "failed to send pre-termination trigger, %s is still cordononed", node.Name))
if !cordoned {
if err := k8s.CordonNode(node.Name, t.rm.RawClient); err != nil {
return t.transitionToHealing(err)
}
}

// After the trigger has been sent, perform health checks to monitor if the node
// can be terminated. If all checks pass then it can be terminated.
allHealthChecksPassed, err := t.performPreTerminationHealthChecks(node)
if err != nil {
return t.transitionToHealing(errors.Wrapf(err, "failed to perform pre-termination health checks, %s is still cordononed", node.Name))
// Perform pre-termination checks after the node is cordoned
// Cruicially, do this before the CNS is created for node to begin termination
if !t.cycleNodeRequest.Spec.SkipPreTerminationChecks && len(t.cycleNodeRequest.Spec.PreTerminationChecks) > 0 {
// Try to send the trigger, if is has already been sent then this will
// be skipped in the function. The trigger must only be sent once
if err := t.sendPreTerminationTrigger(node); err != nil {
return t.transitionToHealing(errors.Wrapf(err, "failed to send pre-termination trigger, %s is still cordononed", node.Name))
}

// After the trigger has been sent, perform health checks to monitor if the node
// can be terminated. If all checks pass then it can be terminated.
allHealthChecksPassed, err := t.performPreTerminationHealthChecks(node)
if err != nil {
return t.transitionToHealing(errors.Wrapf(err, "failed to perform pre-termination health checks, %s is still cordononed", node.Name))
}

// If not all health checks have passed, it is not ready for termination yet
// But we can continue to trigger checks on the other nodes
if !allHealthChecksPassed {
allNodesReadyForTermination = false
continue
}
}

// If not all health checks have passed, it is not ready for termination yet
// But we can continue to trigger checks on the other nodes
if !allHealthChecksPassed {
allNodesReadyForTermination = false
continue
// Create a CycleNodeStatus CRD to start the termination process
if err := t.rm.Client.Create(context.TODO(), t.makeCycleNodeStatusForNode(node.Name)); err != nil {
return t.transitionToHealing(err)
}
}

// Create a CycleNodeStatus CRD to start the termination process
if err := t.rm.Client.Create(context.TODO(), t.makeCycleNodeStatusForNode(node.Name)); err != nil {
return t.transitionToHealing(err)
}

// Add a label to the node to show that we've started working on it
if err := k8s.AddLabelToNode(node.Name, cycleNodeLabel, t.cycleNodeRequest.Name, t.rm.RawClient); err != nil {
t.rm.Logger.Error(err, "patch failed: could not add cyclops label to node", "nodeName", node.Name)
return t.transitionToHealing(err)
// Add a label to the node to show that we've started working on it
if err := k8s.AddLabelToNode(node.Name, cycleNodeLabel, t.cycleNodeRequest.Name, t.rm.RawClient); err != nil {
t.rm.Logger.Error(err, "patch failed: could not add cyclops label to node", "nodeName", node.Name)
return t.transitionToHealing(err)
}
}
}

Expand Down Expand Up @@ -499,7 +502,7 @@ func (t *CycleNodeRequestTransitioner) transitionWaitingTermination() (reconcile
return t.transitionObject(desiredPhase)
}

// transitionFailed handles failed CycleNodeRequests
// transitionHealing handles healing CycleNodeRequests
func (t *CycleNodeRequestTransitioner) transitionHealing() (reconcile.Result, error) {
nodeGroups, err := t.rm.CloudProvider.GetNodeGroups(t.cycleNodeRequest.GetNodeGroupNames())
if err != nil {
Expand All @@ -511,6 +514,9 @@ func (t *CycleNodeRequestTransitioner) transitionHealing() (reconcile.Result, er
t.rm.LogEvent(t.cycleNodeRequest, "AttachingNodes", "Attaching instances to nodes group: %v", node.Name)
alreadyAttached, err := nodeGroups.AttachInstance(node.ProviderID, node.NodeGroupName)
if alreadyAttached {
t.rm.LogEvent(t.cycleNodeRequest,
"AttachingNodes", "Skip re-attaching instances to nodes group: %v, err: %v",
node.Name, err)
continue
}
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/cyclenoderequest/transitioner/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (t *CycleNodeRequestTransitioner) finalReapChildren() (shouldRequeue bool,
}

switch t.cycleNodeRequest.Status.Phase {
case v1.CycleNodeRequestInitialised:
case v1.CycleNodeRequestInitialised, v1.CycleNodeRequestFailed:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix Failed CNR in infinity loop of re-queue

if t.cycleNodeRequest.Status.ActiveChildren == 0 {
// No more work to be done, stop processing this request
return false, nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/cyclenodestatus/transitioner/transitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func (t *CycleNodeStatusTransitioner) transitionUndefined() (reconcile.Result, e
// RemovingLabelsFromPods phase based on the .Spec.Method provided.
// Gets the requested node from the cloud provider and from kube and performs sanity checks. Depending on these checks
// the CycleNodeStatus may go straight to Failed or Successful.
// If the node has problems then it will transition straight to Failed.
//
// If the node has problems then it will transition straight to Failed.
func (t *CycleNodeStatusTransitioner) transitionPending() (reconcile.Result, error) {
t.rm.LogEvent(t.cycleNodeStatus, "FetchingNode", "Fetching information about node: %v", t.cycleNodeStatus.Spec.NodeName)
node, err := t.rm.GetNode(t.cycleNodeStatus.Spec.NodeName)
Expand Down