Skip to content

Commit

Permalink
Add auto-buildable flow
Browse files Browse the repository at this point in the history
  • Loading branch information
wilwell committed Jan 15, 2025
1 parent d32b88a commit 97b300a
Show file tree
Hide file tree
Showing 6 changed files with 635 additions and 68 deletions.
14 changes: 12 additions & 2 deletions api/v1/ytsaurus_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/consts"
)

// EmbeddedPersistentVolumeClaim is an embedded version of k8s.io/api/core/v1.PersistentVolumeClaim.
Expand Down Expand Up @@ -723,6 +725,7 @@ const (
UpdateStateWaitingForYqlaUpdatingPrepare UpdateState = "WaitingForYqlaUpdatingPrepare"
UpdateStateWaitingForYqlaUpdate UpdateState = "WaitingForYqlaUpdate"
UpdateStateWaitingForSafeModeDisabled UpdateState = "WaitingForSafeModeDisabled"
UpdateStateFinishing UpdateState = "Finishing"
)

type TabletCellBundleInfo struct {
Expand Down Expand Up @@ -765,8 +768,10 @@ const (

type UpdateStatus struct {
//+kubebuilder:default:=None
State UpdateState `json:"state,omitempty"`
Components []string `json:"components,omitempty"`
State UpdateState `json:"state,omitempty"`
// Deprecated: Use updatingComponents instead.
Components []string `json:"components,omitempty"`
UpdatingComponents []Component `json:"updatingComponents,omitempty"`
// Flow is an internal field that is needed to persist the chosen flow until the end of an update.
// Flow can be on of ""(unspecified), Stateless, Master, TabletNodes, Full and update cluster stage
// executes steps corresponding to that update flow.
Expand All @@ -776,6 +781,11 @@ type UpdateStatus struct {
MasterMonitoringPaths []string `json:"masterMonitoringPaths,omitempty"`
}

type Component struct {
ComponentName string `json:"componentName,omitempty"`
ComponentType consts.ComponentType `json:"componentType,omitempty"`
}

// YtsaurusStatus defines the observed state of Ytsaurus
type YtsaurusStatus struct {
//+kubebuilder:default:=Created
Expand Down
20 changes: 20 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

80 changes: 24 additions & 56 deletions controllers/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,12 +430,6 @@ func (r *YtsaurusReconciler) handleTabletNodesOnly(
return nil, nil
}

type updateMeta struct {
flow ytv1.UpdateFlow
// componentNames is a list of component names that will be updated. It is built according to the update selector.
componentNames []string
}

func canUpdateComponent(selector ytv1.UpdateSelector, component consts.ComponentType) bool {
switch selector {
case ytv1.UpdateSelectorNothing:
Expand Down Expand Up @@ -465,10 +459,10 @@ func canUpdateComponent(selector ytv1.UpdateSelector, component consts.Component
}
}

// chooseUpdateFlow considers spec and decides if operator should proceed with update or block.
// chooseUpdatingComponents considers spec and decides if operator should proceed with update or block.
// Block case is indicated with non-empty blockMsg.
// If update is not blocked, updateMeta containing a chosen flow and the component names to update returned.
func chooseUpdateFlow(spec ytv1.YtsaurusSpec, needUpdate []components.Component) (meta updateMeta, blockMsg string) {
func chooseUpdatingComponents(spec ytv1.YtsaurusSpec, needUpdate []components.Component) (components []ytv1.Component, blockMsg string) {
configuredSelector := spec.UpdateSelector
if configuredSelector == ytv1.UpdateSelectorUnspecified {
if spec.EnableFullUpdate {
Expand All @@ -478,60 +472,47 @@ func chooseUpdateFlow(spec ytv1.YtsaurusSpec, needUpdate []components.Component)
}
}

var canUpdate []string
var canUpdate []ytv1.Component
var cannotUpdate []string
needFullUpdate := false

for _, comp := range needUpdate {
componentType := comp.GetType()
componentName := comp.GetName()
if canUpdateComponent(configuredSelector, componentType) {
canUpdate = append(canUpdate, componentName)
component := ytv1.Component{
ComponentName: comp.GetName(),
ComponentType: comp.GetType(),
}
if canUpdateComponent(configuredSelector, component.ComponentType) {
canUpdate = append(canUpdate, component)
} else {
cannotUpdate = append(cannotUpdate, componentName)
cannotUpdate = append(cannotUpdate, component.ComponentName)
}
if !canUpdateComponent(ytv1.UpdateSelectorStatelessOnly, componentType) && componentType != consts.DataNodeType {
if !canUpdateComponent(ytv1.UpdateSelectorStatelessOnly, component.ComponentType) && component.ComponentType != consts.DataNodeType {
needFullUpdate = true
}
}

if len(canUpdate) == 0 {
if len(cannotUpdate) != 0 {
return updateMeta{}, fmt.Sprintf("All components allowed by updateSelector are uptodate, update of {%s} is not allowed", strings.Join(cannotUpdate, ", "))
return nil, fmt.Sprintf("All components allowed by updateSelector are uptodate, update of {%s} is not allowed", strings.Join(cannotUpdate, ", "))
}
return updateMeta{}, "All components are uptodate"
return nil, "All components are uptodate"
}

switch configuredSelector {
case ytv1.UpdateSelectorEverything:
if needFullUpdate {
return updateMeta{
flow: ytv1.UpdateFlowFull,
componentNames: nil,
}, ""
return nil, ""
} else {
return updateMeta{
flow: ytv1.UpdateFlowStateless,
componentNames: canUpdate,
}, ""
return canUpdate, ""
}
case ytv1.UpdateSelectorMasterOnly:
return updateMeta{
flow: ytv1.UpdateFlowMaster,
componentNames: canUpdate,
}, ""
return canUpdate, ""
case ytv1.UpdateSelectorTabletNodesOnly:
return updateMeta{
flow: ytv1.UpdateFlowTabletNodes,
componentNames: canUpdate,
}, ""
return canUpdate, ""
case ytv1.UpdateSelectorDataNodesOnly, ytv1.UpdateSelectorExecNodesOnly, ytv1.UpdateSelectorStatelessOnly:
return updateMeta{
flow: ytv1.UpdateFlowStateless,
componentNames: canUpdate,
}, ""
return canUpdate, ""
default:
return updateMeta{}, fmt.Sprintf("Unexpected update selector %s", configuredSelector)
return nil, fmt.Sprintf("Unexpected update selector %s", configuredSelector)
}
}

Expand Down Expand Up @@ -587,37 +568,24 @@ func (r *YtsaurusReconciler) Sync(ctx context.Context, resource *ytv1.Ytsaurus)
needUpdateNames = append(needUpdateNames, c.GetName())
}
logger = logger.WithValues("componentsForUpdateAll", needUpdateNames)
meta, blockMsg := chooseUpdateFlow(ytsaurus.GetResource().Spec, needUpdate)
updatingComponents, blockMsg := chooseUpdatingComponents(ytsaurus.GetResource().Spec, needUpdate)
if blockMsg != "" {
logger.Info(blockMsg)
return ctrl.Result{Requeue: true}, nil
}
logger.Info("Ytsaurus needs components update",
"componentsForUpdateSelected", meta.componentNames,
"flow", meta.flow,
)
"componentsForUpdateSelected", updatingComponents)
ytsaurus.SyncObservedGeneration()
err = ytsaurus.SaveUpdatingClusterState(ctx, meta.flow, meta.componentNames)
err = ytsaurus.SaveUpdatingClusterState(ctx, updatingComponents)
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
}

case ytv1.ClusterStateUpdating:
var result *ctrl.Result
var err error

switch ytsaurus.GetUpdateFlow() {
case ytv1.UpdateFlowFull:
result, err = r.handleEverything(ctx, ytsaurus, componentManager)
case ytv1.UpdateFlowStateless:
result, err = r.handleStateless(ctx, ytsaurus, componentManager)
case ytv1.UpdateFlowMaster:
result, err = r.handleMasterOnly(ctx, ytsaurus, componentManager)
case ytv1.UpdateFlowTabletNodes:
result, err = r.handleTabletNodesOnly(ctx, ytsaurus, componentManager)
}
updatingComponents := ytsaurus.GetUpdatingComponents()
result, err := buildAndExecuteFlow(ctx, ytsaurus, componentManager, updatingComponents)

if result != nil {
return *result, err
Expand Down
Loading

0 comments on commit 97b300a

Please sign in to comment.