Skip to content

Commit

Permalink
Add update flow tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wilwell committed Jan 29, 2025
1 parent 7603562 commit b0cbce7
Show file tree
Hide file tree
Showing 5 changed files with 371 additions and 28 deletions.
10 changes: 7 additions & 3 deletions controllers/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,14 @@ func (r *YtsaurusReconciler) Sync(ctx context.Context, resource *ytv1.Ytsaurus)

case ytv1.ClusterStateUpdating:
updatingComponents := ytsaurus.GetUpdatingComponents()
result, err := buildAndExecuteFlow(ctx, ytsaurus, componentManager, updatingComponents)
progressed, err := buildAndExecuteFlow(ctx, ytsaurus, componentManager, updatingComponents)

if result != nil {
return *result, err
if err != nil {
return ctrl.Result{}, err
}

if progressed {
return ctrl.Result{Requeue: true}, err
}

case ytv1.ClusterStateCancelUpdate:
Expand Down
49 changes: 26 additions & 23 deletions controllers/update_flow_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"fmt"

ctrl "sigs.k8s.io/controller-runtime"

ytv1 "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1"
apiProxy "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/apiproxy"
"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/components"
Expand All @@ -22,11 +20,20 @@ const (
stepResultMarkUnhappy stepResultMark = "unhappy"
)

func buildAndExecuteFlow(ctx context.Context, ytsaurus *apiProxy.Ytsaurus, componentManager *ComponentManager, updatingComponents []ytv1.Component) (*ctrl.Result, error) {
tree := buildFlowTree(updatingComponents, componentManager.allComponents)
_, err := tree.execute(ctx, ytsaurus, componentManager)
return &ctrl.Result{Requeue: true}, err
func buildAndExecuteFlow(ctx context.Context, ytsaurus *apiProxy.Ytsaurus, componentManager *ComponentManager, updatingComponents []ytv1.Component) (bool, error) {
allComponents := convertToYtComponents(componentManager.allComponents)
tree := buildFlowTree(updatingComponents, allComponents)
return tree.execute(ctx, ytsaurus, componentManager)
}

func convertToYtComponents(components []components.Component) []ytv1.Component {
result := make([]ytv1.Component, len(components))
for i, c := range components {
result[i] = ytv1.Component{
ComponentType: c.GetType(),
}
}
return result
}

var terminateTransitions = map[ytv1.UpdateState]ytv1.ClusterState{
Expand All @@ -42,7 +49,6 @@ func flowCheckStatusCondition(conditionName string) flowCondition {
}
return stepResultMarkUnsatisfied
}

}

type flowStep struct {
Expand Down Expand Up @@ -101,15 +107,15 @@ type flowTree struct {
index map[ytv1.UpdateState]*flowStep
head *flowStep
tail *flowStep

deferredChain ytv1.UpdateState
}

func newFlowTree(head *flowStep) *flowTree {
return &flowTree{
index: make(map[ytv1.UpdateState]*flowStep),
head: head,
tail: head,
index: map[ytv1.UpdateState]*flowStep{
head.updateState: head,
},
head: head,
tail: head,
}
}

Expand All @@ -122,6 +128,7 @@ func (f *flowTree) execute(ctx context.Context, ytsaurus *apiProxy.Ytsaurus, com
mark := currentStep.checkCondition(ctx, ytsaurus, componentManager)
// condition is not met, wait for the next update
if mark == stepResultMarkUnsatisfied {
ytsaurus.LogUpdate(ctx, fmt.Sprintf("Update flow: condition not met for %s", currentState))
return false, nil
}

Expand Down Expand Up @@ -153,12 +160,6 @@ func (f *flowTree) execute(ctx context.Context, ytsaurus *apiProxy.Ytsaurus, com
}

func (f *flowTree) chain(steps ...*flowStep) *flowTree {
if f.deferredChain != "" {
if len(steps) != 0 {
f.index[f.deferredChain].chain(steps[0])
f.deferredChain = ""
}
}
for _, step := range steps {
f.index[step.updateState] = step
f.tail.chain(step)
Expand Down Expand Up @@ -206,6 +207,7 @@ var flowConditions = map[ytv1.UpdateState]flowCondition{
ytv1.UpdateStateWaitingForYqlaUpdatingPrepare: flowCheckStatusCondition(consts.ConditionYqlaPreparedForUpdating),
ytv1.UpdateStateWaitingForYqlaUpdate: flowCheckStatusCondition(consts.ConditionYqlaUpdated),
ytv1.UpdateStateWaitingForSafeModeDisabled: flowCheckStatusCondition(consts.ConditionSafeModeDisabled),
ytv1.UpdateStateWaitingForMasterExitReadOnly: flowCheckStatusCondition(consts.ConditionMasterExitedReadOnly),
ytv1.UpdateStateWaitingForPodsRemoval: func(ctx context.Context, ytsaurus *apiProxy.Ytsaurus, componentManager *ComponentManager) stepResultMark {
if componentManager.areNonMasterPodsRemoved() {
return stepResultMarkHappy
Expand All @@ -232,7 +234,7 @@ var flowConditions = map[ytv1.UpdateState]flowCondition{
},
}

func buildFlowTree(updatingComponents []ytv1.Component, allComponents []components.Component) *flowTree {
func buildFlowTree(updatingComponents []ytv1.Component, allComponents []ytv1.Component) *flowTree {
st := newSimpleStep
head := st(ytv1.UpdateStateNone)
tree := newFlowTree(head)
Expand All @@ -245,7 +247,8 @@ func buildFlowTree(updatingComponents []ytv1.Component, allComponents []componen
updYqlAgent := hasComponent(updatingComponents, allComponents, consts.YqlAgentType)

// TODO: if validation conditions can be not mentioned here or needed
tree.chain(
tree.chainIf(
updMasterOrTablet,
newConditionalForkStep(
ytv1.UpdateStatePossibilityCheck,
// This is the unhappy path.
Expand Down Expand Up @@ -297,10 +300,10 @@ func buildFlowTree(updatingComponents []ytv1.Component, allComponents []componen
return tree
}

func hasComponent(updatingComponents []ytv1.Component, allComponents []components.Component, componentType consts.ComponentType) bool {
if updatingComponents == nil {
func hasComponent(updatingComponents []ytv1.Component, allComponents []ytv1.Component, componentType consts.ComponentType) bool {
if len(updatingComponents) == 0 {
for _, component := range allComponents {
if component.GetType() == componentType {
if component.ComponentType == componentType {
return true
}
}
Expand Down
Loading

0 comments on commit b0cbce7

Please sign in to comment.