Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix race condition #52

Merged
merged 13 commits into from
Oct 16, 2022
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/cenkalti/backoff/v4 v4.1.0
github.com/go-logr/logr v0.4.0
github.com/google/uuid v1.1.5
github.com/hashicorp/go-version v1.4.0
github.com/logrusorgru/aurora v2.0.3+incompatible
github.com/mattn/go-isatty v0.0.12
github.com/operator-framework/operator-lib v0.9.0
Expand All @@ -24,7 +25,6 @@ require (
k8s.io/client-go v0.22.6
k8s.io/klog v1.0.0
sigs.k8s.io/controller-runtime v0.10.0
github.com/hashicorp/go-version v1.4.0
)

require (
Expand Down
928 changes: 928 additions & 0 deletions go.sum

Large diffs are not rendered by default.

26 changes: 18 additions & 8 deletions pkg/cloudprovider/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,27 @@ func instanceIDToProviderID(instanceID, availabilityZone string) (string, error)
return fmt.Sprintf("aws:///%s/%s", availabilityZone, instanceID), nil
}

func verifyIfErrorOccured(apiErr error, expectedMessage string) (bool, error) {
if awsErr, ok := apiErr.(awserr.Error); ok {
// process SDK error: Unfortunately there's no generic ValidationError in the SDK and no FailedAttach/FailedDetach error. Check manually
if awsErr.Code() == validationErrorCode && strings.Contains(awsErr.Message(), expectedMessage) {
return true, apiErr
func verifyIfErrorOccurred(apiErr error, expectedMessage ...string) (bool, error) {
for _, msg := range expectedMessage {
if awsErr, ok := apiErr.(awserr.Error); ok {
// process SDK error: Unfortunately there's no generic ValidationError in the SDK and no FailedAttach/FailedDetach error. Check manually
if awsErr.Code() == validationErrorCode && strings.Contains(awsErr.Message(), msg) {
return true, apiErr
}
}
}

return false, apiErr
}

func verifyIfErrorOccurredWithDefaults(apiErr error, expectedMessage string) (bool, error) {
skip_errs := []string{
// default errors we wanted to skip
"is not in correct state",
expectedMessage,
}
return verifyIfErrorOccurred(apiErr, skip_errs...)
}

type provider struct {
autoScalingService *autoscaling.AutoScaling
ec2Service *ec2.EC2
Expand Down Expand Up @@ -267,7 +277,7 @@ func (a *autoscalingGroups) DetachInstance(providerID string) (alreadyDetaching
ShouldDecrementDesiredCapacity: aws.Bool(false),
})

return verifyIfErrorOccured(apiErr, alreadyDetachingMessage)
return verifyIfErrorOccurred(apiErr, alreadyDetachingMessage)
}

// AttachInstance attaches the instance to the Autoscaling group
Expand All @@ -287,7 +297,7 @@ func (a *autoscalingGroups) AttachInstance(providerID, nodeGroup string) (alread
InstanceIds: aws.StringSlice([]string{instanceID}),
})

return verifyIfErrorOccured(apiErr, alreadyAttachedMessage)
return verifyIfErrorOccurredWithDefaults(apiErr, alreadyAttachedMessage)
}

func (a *autoscalingGroups) instanceOutOfDate(instance *autoscaling.Instance) bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/cyclenoderequest/transitioner/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (t *CycleNodeRequestTransitioner) sendPreTerminationTrigger(node v1.CycleNo
}

// Send the trigger, disregard the response body
statusCode, _, err := t.makeRequest(http.MethodPost, httpClient, endpoint)
statusCode, res, err := t.makeRequest(http.MethodPost, httpClient, endpoint)
if err != nil {
return fmt.Errorf("sending trigger failed: %v", err)
}
Expand All @@ -360,7 +360,7 @@ func (t *CycleNodeRequestTransitioner) sendPreTerminationTrigger(node v1.CycleNo
}

if !statusCodeFound {
return fmt.Errorf("got unexpected status code after sending trigger: %d", statusCode)
return fmt.Errorf("got unexpected status code after sending trigger: %d, resp: %s", statusCode, res)
}

now := metav1.Now()
Expand Down
32 changes: 23 additions & 9 deletions pkg/controller/cyclenoderequest/transitioner/transitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"github.com/atlassian-labs/cyclops/pkg/k8s"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -55,9 +57,9 @@ func (t *CycleNodeRequestTransitioner) transitionUndefined() (reconcile.Result,

// transitionPending transitions any CycleNodeRequests in the pending phase to the initialised phase
// Does the following:
// 1. fetches the current nodes by the label selector, and saves them as nodes to be terminated
// 2. describes the node group and checks that the number of instances in the node group matches the number we
// are planning on terminating
// 1. fetches the current nodes by the label selector, and saves them as nodes to be terminated
// 2. describes the node group and checks that the number of instances in the node group matches the number we
// are planning on terminating
func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, error) {
// Fetch the node names for the cycleNodeRequest, using the label selector provided
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Selecting nodes with label selector")
Expand Down Expand Up @@ -303,8 +305,8 @@ func (t *CycleNodeRequestTransitioner) transitionScalingUp() (reconcile.Result,

// Check we have waited long enough - give the node some time to start up
if time.Since(scaleUpStarted.Time) <= scaleUpWait {
t.rm.LogEvent(t.cycleNodeRequest, "ScalingUpWaiting", "Waiting for new nodes to be ready")
return reconcile.Result{Requeue: true, RequeueAfter: requeueDuration}, nil
t.rm.LogEvent(t.cycleNodeRequest, "ScalingUpWaiting", "Waiting for new nodes to be warmed up")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reduce the number of unnecessary requeue

return reconcile.Result{Requeue: true, RequeueAfter: scaleUpWait}, nil
}

nodeGroups, err := t.rm.CloudProvider.GetNodeGroups(t.cycleNodeRequest.GetNodeGroupNames())
Expand Down Expand Up @@ -374,7 +376,8 @@ func (t *CycleNodeRequestTransitioner) transitionScalingUp() (reconcile.Result,
for i, node := range t.cycleNodeRequest.Status.CurrentNodes {
if nodeToRemove.Name == node.Name {
t.rm.LogEvent(t.cycleNodeRequest, "RaceCondition", "Node %v was prematurely terminated.", node.Name)
t.cycleNodeRequest.Status.CurrentNodes = append(t.cycleNodeRequest.Status.CurrentNodes[:i], t.cycleNodeRequest.Status.CurrentNodes[i+1:]...)
t.cycleNodeRequest.Status.CurrentNodes = append(t.cycleNodeRequest.Status.CurrentNodes[:i],
t.cycleNodeRequest.Status.CurrentNodes[i+1:]...)
break
}
}
Expand Down Expand Up @@ -415,6 +418,10 @@ func (t *CycleNodeRequestTransitioner) transitionCordoning() (reconcile.Result,
for _, node := range t.cycleNodeRequest.Status.CurrentNodes {
// If the node is not already cordoned, cordon it
cordoned, err := k8s.IsCordoned(node.Name, t.rm.RawClient)
// Skip handling the node if it doesn't exist
if apierrors.IsNotFound(err) {
continue
}
if err != nil {
t.rm.Logger.Error(err, "failed to check if node is cordoned", "nodeName", node.Name)
return t.transitionToHealing(err)
Expand All @@ -431,14 +438,18 @@ func (t *CycleNodeRequestTransitioner) transitionCordoning() (reconcile.Result,
// Try to send the trigger, if is has already been sent then this will
// be skipped in the function. The trigger must only be sent once
if err := t.sendPreTerminationTrigger(node); err != nil {
return t.transitionToHealing(errors.Wrapf(err, "failed to send pre-termination trigger, %s is still cordononed", node.Name))
t.rm.LogEvent(t.cycleNodeRequest,
"PreTerminationTriggerFailed", "failed to send pre-termination trigger to %s, err: %v", node.Name, err)
return t.transitionToHealing(errors.Wrapf(err, "failed to send pre-termination trigger to %s", node.Name))
}

// After the trigger has been sent, perform health checks to monitor if the node
// can be terminated. If all checks pass then it can be terminated.
allHealthChecksPassed, err := t.performPreTerminationHealthChecks(node)
if err != nil {
return t.transitionToHealing(errors.Wrapf(err, "failed to perform pre-termination health checks, %s is still cordononed", node.Name))
t.rm.LogEvent(t.cycleNodeRequest, "PreTerminationHealChecks",
"failed to perform pre-termination health checks for %v, err: %v", node.Name, err)
return t.transitionToHealing(errors.Wrapf(err, "failed to perform pre-termination health checks for %s", node.Name))
}

// If not all health checks have passed, it is not ready for termination yet
Expand Down Expand Up @@ -499,7 +510,7 @@ func (t *CycleNodeRequestTransitioner) transitionWaitingTermination() (reconcile
return t.transitionObject(desiredPhase)
}

// transitionFailed handles failed CycleNodeRequests
// transitionHealing handles healing CycleNodeRequests
func (t *CycleNodeRequestTransitioner) transitionHealing() (reconcile.Result, error) {
nodeGroups, err := t.rm.CloudProvider.GetNodeGroups(t.cycleNodeRequest.GetNodeGroupNames())
if err != nil {
Expand All @@ -511,6 +522,9 @@ func (t *CycleNodeRequestTransitioner) transitionHealing() (reconcile.Result, er
t.rm.LogEvent(t.cycleNodeRequest, "AttachingNodes", "Attaching instances to nodes group: %v", node.Name)
alreadyAttached, err := nodeGroups.AttachInstance(node.ProviderID, node.NodeGroupName)
if alreadyAttached {
t.rm.LogEvent(t.cycleNodeRequest,
"AttachingNodes", "Skip re-attaching instances to nodes group: %v, err: %v",
node.Name, err)
continue
}
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/cyclenoderequest/transitioner/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (t *CycleNodeRequestTransitioner) finalReapChildren() (shouldRequeue bool,
}

switch t.cycleNodeRequest.Status.Phase {
case v1.CycleNodeRequestInitialised:
case v1.CycleNodeRequestInitialised, v1.CycleNodeRequestFailed:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix Failed CNR in infinity loop of re-queue

if t.cycleNodeRequest.Status.ActiveChildren == 0 {
// No more work to be done, stop processing this request
return false, nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/cyclenodestatus/transitioner/transitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func (t *CycleNodeStatusTransitioner) transitionUndefined() (reconcile.Result, e
// RemovingLabelsFromPods phase based on the .Spec.Method provided.
// Gets the requested node from the cloud provider and from kube and performs sanity checks. Depending on these checks
// the CycleNodeStatus may go straight to Failed or Successful.
// If the node has problems then it will transition straight to Failed.
//
// If the node has problems then it will transition straight to Failed.
func (t *CycleNodeStatusTransitioner) transitionPending() (reconcile.Result, error) {
t.rm.LogEvent(t.cycleNodeStatus, "FetchingNode", "Fetching information about node: %v", t.cycleNodeStatus.Spec.NodeName)
node, err := t.rm.GetNode(t.cycleNodeStatus.Spec.NodeName)
Expand Down
1 change: 1 addition & 0 deletions pkg/k8s/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
Expand Down