Skip to content

Commit

Permalink
Merge pull request #2665 from nojnhuh/async
Browse files Browse the repository at this point in the history
fix irrecoverable errors in async operations
  • Loading branch information
k8s-ci-robot authored Sep 28, 2022
2 parents bf5bc1c + ef6f663 commit a38912d
Show file tree
Hide file tree
Showing 67 changed files with 384 additions and 449 deletions.
4 changes: 2 additions & 2 deletions azure/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ type ClusterDescriber interface {
// AsyncStatusUpdater is an interface used to keep track of long running operations in Status that has Conditions and Futures.
type AsyncStatusUpdater interface {
SetLongRunningOperationState(*infrav1.Future)
GetLongRunningOperationState(string, string) *infrav1.Future
DeleteLongRunningOperationState(string, string)
GetLongRunningOperationState(string, string, string) *infrav1.Future
DeleteLongRunningOperationState(string, string, string)
UpdatePutStatus(clusterv1.ConditionType, string, error)
UpdateDeleteStatus(clusterv1.ConditionType, string, error)
UpdatePatchStatus(clusterv1.ConditionType, string, error)
Expand Down
16 changes: 8 additions & 8 deletions azure/mock_azure/azure_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions azure/scope/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,13 +914,13 @@ func (s *ClusterScope) SetLongRunningOperationState(future *infrav1.Future) {
}

// GetLongRunningOperationState will get the future on the AzureCluster status.
func (s *ClusterScope) GetLongRunningOperationState(name, service string) *infrav1.Future {
return futures.Get(s.AzureCluster, name, service)
func (s *ClusterScope) GetLongRunningOperationState(name, service, futureType string) *infrav1.Future {
return futures.Get(s.AzureCluster, name, service, futureType)
}

// DeleteLongRunningOperationState will delete the future from the AzureCluster status.
func (s *ClusterScope) DeleteLongRunningOperationState(name, service string) {
futures.Delete(s.AzureCluster, name, service)
func (s *ClusterScope) DeleteLongRunningOperationState(name, service, futureType string) {
futures.Delete(s.AzureCluster, name, service, futureType)
}

// UpdateDeleteStatus updates a condition on the AzureCluster status after a DELETE operation.
Expand Down
14 changes: 7 additions & 7 deletions azure/scope/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,9 @@ func (m *MachineScope) Subnet() infrav1.SubnetSpec {

// AvailabilityZone returns the AzureMachine Availability Zone.
// Priority for selecting the AZ is
// 1) Machine.Spec.FailureDomain
// 2) AzureMachine.Spec.FailureDomain (This is to support deprecated AZ)
// 3) No AZ
// 1. Machine.Spec.FailureDomain
// 2. AzureMachine.Spec.FailureDomain (This is to support deprecated AZ)
// 3. No AZ
func (m *MachineScope) AvailabilityZone() string {
if m.Machine.Spec.FailureDomain != nil {
return *m.Machine.Spec.FailureDomain
Expand Down Expand Up @@ -687,13 +687,13 @@ func (m *MachineScope) SetLongRunningOperationState(future *infrav1.Future) {
}

// GetLongRunningOperationState will get the future on the AzureMachine status.
func (m *MachineScope) GetLongRunningOperationState(name, service string) *infrav1.Future {
return futures.Get(m.AzureMachine, name, service)
func (m *MachineScope) GetLongRunningOperationState(name, service, futureType string) *infrav1.Future {
return futures.Get(m.AzureMachine, name, service, futureType)
}

// DeleteLongRunningOperationState will delete the future from the AzureMachine status.
func (m *MachineScope) DeleteLongRunningOperationState(name, service string) {
futures.Delete(m.AzureMachine, name, service)
func (m *MachineScope) DeleteLongRunningOperationState(name, service, futureType string) {
futures.Delete(m.AzureMachine, name, service, futureType)
}

// UpdateDeleteStatus updates a condition on the AzureMachine status after a DELETE operation.
Expand Down
12 changes: 7 additions & 5 deletions azure/scope/machinepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,9 @@ func (m *MachinePoolScope) applyAzureMachinePoolMachines(ctx context.Context) er
return nil
}

if futures.Has(m.AzureMachinePool, m.Name(), ScalesetsServiceName) {
if futures.Has(m.AzureMachinePool, m.Name(), ScalesetsServiceName, infrav1.PatchFuture) ||
futures.Has(m.AzureMachinePool, m.Name(), ScalesetsServiceName, infrav1.PutFuture) ||
futures.Has(m.AzureMachinePool, m.Name(), ScalesetsServiceName, infrav1.DeleteFuture) {
log.V(4).Info("exiting early due an in-progress long running operation on the ScaleSet")
// exit early to be less greedy about delete
return nil
Expand Down Expand Up @@ -377,13 +379,13 @@ func (m *MachinePoolScope) SetLongRunningOperationState(future *infrav1.Future)
}

// GetLongRunningOperationState will get the future on the AzureMachinePool status.
func (m *MachinePoolScope) GetLongRunningOperationState(name, service string) *infrav1.Future {
return futures.Get(m.AzureMachinePool, name, service)
func (m *MachinePoolScope) GetLongRunningOperationState(name, service, futureType string) *infrav1.Future {
return futures.Get(m.AzureMachinePool, name, service, futureType)
}

// DeleteLongRunningOperationState will delete the future from the AzureMachinePool status.
func (m *MachinePoolScope) DeleteLongRunningOperationState(name, service string) {
futures.Delete(m.AzureMachinePool, name, service)
func (m *MachinePoolScope) DeleteLongRunningOperationState(name, service, futureType string) {
futures.Delete(m.AzureMachinePool, name, service, futureType)
}

// setProvisioningStateAndConditions sets the AzureMachinePool provisioning state and conditions.
Expand Down
8 changes: 4 additions & 4 deletions azure/scope/machinepoolmachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,13 @@ func (s *MachinePoolMachineScope) SetLongRunningOperationState(future *infrav1.F
}

// GetLongRunningOperationState will get the future on the AzureMachinePoolMachine status.
func (s *MachinePoolMachineScope) GetLongRunningOperationState(name, service string) *infrav1.Future {
return futures.Get(s.AzureMachinePoolMachine, name, service)
func (s *MachinePoolMachineScope) GetLongRunningOperationState(name, service, futureType string) *infrav1.Future {
return futures.Get(s.AzureMachinePoolMachine, name, service, futureType)
}

// DeleteLongRunningOperationState will delete the future from the AzureMachinePoolMachine status.
func (s *MachinePoolMachineScope) DeleteLongRunningOperationState(name, service string) {
futures.Delete(s.AzureMachinePoolMachine, name, service)
func (s *MachinePoolMachineScope) DeleteLongRunningOperationState(name, service, futureType string) {
futures.Delete(s.AzureMachinePoolMachine, name, service, futureType)
}

// UpdateDeleteStatus updates a condition on the AzureMachinePoolMachine status after a DELETE operation.
Expand Down
8 changes: 4 additions & 4 deletions azure/scope/managedcontrolplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,13 +561,13 @@ func (s *ManagedControlPlaneScope) SetLongRunningOperationState(future *infrav1.
}

// GetLongRunningOperationState will get the future on the AzureManagedControlPlane status.
func (s *ManagedControlPlaneScope) GetLongRunningOperationState(name, service string) *infrav1.Future {
return futures.Get(s.ControlPlane, name, service)
func (s *ManagedControlPlaneScope) GetLongRunningOperationState(name, service, futureType string) *infrav1.Future {
return futures.Get(s.ControlPlane, name, service, futureType)
}

// DeleteLongRunningOperationState will delete the future from the AzureManagedControlPlane status.
func (s *ManagedControlPlaneScope) DeleteLongRunningOperationState(name, service string) {
futures.Delete(s.ControlPlane, name, service)
func (s *ManagedControlPlaneScope) DeleteLongRunningOperationState(name, service, futureType string) {
futures.Delete(s.ControlPlane, name, service, futureType)
}

// UpdateDeleteStatus updates a condition on the AzureManagedControlPlane status after a DELETE operation.
Expand Down
16 changes: 8 additions & 8 deletions azure/scope/managedmachinepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,20 +221,20 @@ func (s *ManagedMachinePoolScope) SetAgentPoolReady(ready bool) {
s.InfraMachinePool.Status.Ready = ready
}

// SetLongRunningOperationState will set the future on the AzureManagedControlPlane status to allow the resource to continue
// SetLongRunningOperationState will set the future on the AzureManagedMachinePool status to allow the resource to continue
// in the next reconciliation.
func (s *ManagedMachinePoolScope) SetLongRunningOperationState(future *infrav1.Future) {
futures.Set(s.ControlPlane, future)
futures.Set(s.InfraMachinePool, future)
}

// GetLongRunningOperationState will get the future on the AzureManagedControlPlane status.
func (s *ManagedMachinePoolScope) GetLongRunningOperationState(name, service string) *infrav1.Future {
return futures.Get(s.ControlPlane, name, service)
// GetLongRunningOperationState will get the future on the AzureManagedMachinePool status.
func (s *ManagedMachinePoolScope) GetLongRunningOperationState(name, service, futureType string) *infrav1.Future {
return futures.Get(s.InfraMachinePool, name, service, futureType)
}

// DeleteLongRunningOperationState will delete the future from the AzureManagedControlPlane status.
func (s *ManagedMachinePoolScope) DeleteLongRunningOperationState(name, service string) {
futures.Delete(s.ControlPlane, name, service)
// DeleteLongRunningOperationState will delete the future from the AzureManagedMachinePool status.
func (s *ManagedMachinePoolScope) DeleteLongRunningOperationState(name, service, futureType string) {
futures.Delete(s.InfraMachinePool, name, service, futureType)
}

// UpdateDeleteStatus updates a condition on the AzureManagedControlPlane status after a DELETE operation.
Expand Down
7 changes: 1 addition & 6 deletions azure/services/agentpools/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,7 @@ func (ac *azureClient) IsDone(ctx context.Context, future azureautorest.FutureAP
ctx, _, done := tele.StartSpanWithLogger(ctx, "agentpools.azureClient.IsDone")
defer done()

isDone, err = future.DoneWithContext(ctx, ac.agentpools)
if err != nil {
return false, errors.Wrap(err, "failed checking if the operation was complete")
}

return isDone, nil
return future.DoneWithContext(ctx, ac.agentpools)
}

// Result fetches the result of a long-running operation future.
Expand Down
16 changes: 8 additions & 8 deletions azure/services/agentpools/mock_agentpools/agentpools_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 26 additions & 19 deletions azure/services/async/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ func New(scope FutureScope, createClient Creator, deleteClient Deleter) *Service

// processOngoingOperation is a helper function that will process an ongoing operation to check if it is done.
// If it is not done, it will return a transient error.
func processOngoingOperation(ctx context.Context, scope FutureScope, client FutureHandler, resourceName string, serviceName string) (result interface{}, err error) {
func processOngoingOperation(ctx context.Context, scope FutureScope, client FutureHandler, resourceName string, serviceName string, futureType string) (result interface{}, err error) {
ctx, log, done := tele.StartSpanWithLogger(ctx, "async.Service.processOngoingOperation")
defer done()

future := scope.GetLongRunningOperationState(resourceName, serviceName)
future := scope.GetLongRunningOperationState(resourceName, serviceName, futureType)
if future == nil {
log.V(2).Info("no long running operation found", "service", serviceName, "resource", resourceName)
return nil, nil
Expand All @@ -61,31 +61,36 @@ func processOngoingOperation(ctx context.Context, scope FutureScope, client Futu
// Reset the future data to avoid getting stuck in a bad loop.
// In theory, this should never happen, but if for some reason the future that is already stored in Status isn't properly formatted
// and we don't reset it we would be stuck in an infinite loop trying to parse it.
scope.DeleteLongRunningOperationState(resourceName, serviceName)
scope.DeleteLongRunningOperationState(resourceName, serviceName, futureType)
return nil, errors.Wrap(err, "could not decode future data, resetting long-running operation state")
}

isDone, err := client.IsDone(ctx, sdkFuture)
if err != nil {
return nil, errors.Wrap(err, "failed checking if the operation was complete")
}

// Assume that if isDone is true, then we successfully checked that the
// operation was complete even if err is non-nil. Assume the error in that
// case is unrelated and will be captured in Result below.
if !isDone {
if err != nil {
return nil, errors.Wrap(err, "failed checking if the operation was complete")
}

// Operation is still in progress, update conditions and requeue.
log.V(2).Info("long running operation is still ongoing", "service", serviceName, "resource", resourceName)
return nil, azure.WithTransientError(azure.NewOperationNotDoneError(future), retryAfter(sdkFuture))
}
if err != nil {
log.V(2).Error(err, "error checking long running operation status after it finished")
}

// Once the operation is done, we can delete the long running operation state.
// If the operation failed, this will allow it to be retried during the next reconciliation.
// If the resource is not found, we also reset the long-running operation state so we can attempt to create it again.
// This can happen if the resource was deleted by another process before we could get the result.
scope.DeleteLongRunningOperationState(resourceName, serviceName, futureType)

// Resource has been created/deleted/updated.
log.V(2).Info("long running operation has completed", "service", serviceName, "resource", resourceName)
result, err = client.Result(ctx, sdkFuture, future.Type)
if err == nil || azure.ResourceNotFound(err) {
// Once we have the result, we can delete the long running operation state.
// If the resource is not found, we also reset the long-running operation state so we can attempt to create it again.
// This can happen if the resource was deleted by another process before we could get the result.
scope.DeleteLongRunningOperationState(resourceName, serviceName)
}
return result, err
return client.Result(ctx, sdkFuture, future.Type)
}

// CreateResource implements the logic for creating a resource Asynchronously.
Expand All @@ -95,11 +100,12 @@ func (s *Service) CreateResource(ctx context.Context, spec azure.ResourceSpecGet

resourceName := spec.ResourceName()
rgName := spec.ResourceGroupName()
futureType := infrav1.PutFuture

// Check if there is an ongoing long running operation.
future := s.Scope.GetLongRunningOperationState(resourceName, serviceName)
future := s.Scope.GetLongRunningOperationState(resourceName, serviceName, futureType)
if future != nil {
return processOngoingOperation(ctx, s.Scope, s.Creator, resourceName, serviceName)
return processOngoingOperation(ctx, s.Scope, s.Creator, resourceName, serviceName, futureType)
}

// Get the resource if it already exists, and use it to construct the desired resource parameters.
Expand Down Expand Up @@ -146,11 +152,12 @@ func (s *Service) DeleteResource(ctx context.Context, spec azure.ResourceSpecGet

resourceName := spec.ResourceName()
rgName := spec.ResourceGroupName()
futureType := infrav1.DeleteFuture

// Check if there is an ongoing long running operation.
future := s.Scope.GetLongRunningOperationState(resourceName, serviceName)
future := s.Scope.GetLongRunningOperationState(resourceName, serviceName, futureType)
if future != nil {
_, err := processOngoingOperation(ctx, s.Scope, s.Deleter, resourceName, serviceName)
_, err := processOngoingOperation(ctx, s.Scope, s.Deleter, resourceName, serviceName, futureType)
return err
}

Expand Down
Loading

0 comments on commit a38912d

Please sign in to comment.