Skip to content

Commit

Permalink
Decouple controller flags from CLIOptions struct
Browse files Browse the repository at this point in the history
This eliminates one more global variable. Make the single and
enable-worker flags an implementation detail of flag handling, and
provide an enum to the outside world instead. Draw a clear line between
controller and worker flags. Instead of accessing the entire controller
flags struct in the worker, have a bespoke interface that acts as an
integration point between a worker and its embedding controller, if any.

Signed-off-by: Tom Wieczorek <twieczorek@mirantis.com>
  • Loading branch information
twz123 committed Jan 15, 2025
1 parent 7edf778 commit ebdd6f4
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 76 deletions.
99 changes: 55 additions & 44 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ import (
type command config.CLIOptions

func NewControllerCmd() *cobra.Command {
var ignorePreFlightChecks bool
var (
controllerFlags config.ControllerOptions
ignorePreFlightChecks bool
)

cmd := &cobra.Command{
Use: "controller [join-token]",
Expand Down Expand Up @@ -103,34 +106,35 @@ func NewControllerCmd() *cobra.Command {
if c.TokenArg != "" && c.TokenFile != "" {
return errors.New("you can only pass one token argument either as a CLI argument 'k0s controller [join-token]' or as a flag 'k0s controller --token-file [path]'")
}
if err := c.ControllerOptions.Normalize(); err != nil {
if err := controllerFlags.Normalize(); err != nil {
return err
}

if err := (&sysinfo.K0sSysinfoSpec{
ControllerRoleEnabled: true,
WorkerRoleEnabled: c.SingleNode || c.EnableWorker,
WorkerRoleEnabled: controllerFlags.Mode().WorkloadsEnabled(),
DataDir: c.K0sVars.DataDir,
}).RunPreFlightChecks(ignorePreFlightChecks); !ignorePreFlightChecks && err != nil {
return err
}

ctx, cancel := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
defer cancel()
return c.start(ctx)
return c.start(ctx, &controllerFlags)
},
}

flags := cmd.Flags()
flags.AddFlagSet(config.GetPersistentFlagSet())
flags.AddFlagSet(config.GetControllerFlags())
flags.AddFlagSet(config.GetControllerFlags(&controllerFlags))
flags.AddFlagSet(config.GetWorkerFlags())
flags.AddFlagSet(config.FileInputFlag())
flags.BoolVar(&ignorePreFlightChecks, "ignore-pre-flight-checks", false, "continue even if pre-flight checks fail")

return cmd
}

func (c *command) start(ctx context.Context) error {
func (c *command) start(ctx context.Context, flags *config.ControllerOptions) error {
perfTimer := performance.NewTimer("controller-start").Buffer().Start()

nodeConfig, err := c.K0sVars.NodeConfig()
Expand Down Expand Up @@ -240,6 +244,8 @@ func (c *command) start(ctx context.Context) error {
logrus.Infof("using storage backend %s", nodeConfig.Spec.Storage.Type)
nodeComponents.Add(ctx, storageBackend)

controllerMode := flags.Mode()

// Assume a single active controller during startup
numActiveControllers := value.NewLatest[uint](1)

Expand All @@ -249,10 +255,10 @@ func (c *command) start(ctx context.Context) error {
})

enableK0sEndpointReconciler := nodeConfig.Spec.API.ExternalAddress != "" &&
!slices.Contains(c.DisableComponents, constant.APIEndpointReconcilerComponentName)
!slices.Contains(flags.DisableComponents, constant.APIEndpointReconcilerComponentName)

if cplbCfg := nodeConfig.Spec.Network.ControlPlaneLoadBalancing; cplbCfg != nil && cplbCfg.Enabled {
if c.SingleNode {
if controllerMode == config.SingleNodeMode {
return errors.New("control plane load balancing cannot be used in a single-node cluster")
}

Expand All @@ -271,7 +277,7 @@ func (c *command) start(ctx context.Context) error {
})
}

enableKonnectivity := !c.SingleNode && !slices.Contains(c.DisableComponents, constant.KonnectivityServerComponentName)
enableKonnectivity := controllerMode != config.SingleNodeMode && !slices.Contains(flags.DisableComponents, constant.KonnectivityServerComponentName)

if enableKonnectivity {
nodeComponents.Add(ctx, &controller.Konnectivity{
Expand All @@ -293,7 +299,7 @@ func (c *command) start(ctx context.Context) error {
DisableEndpointReconciler: enableK0sEndpointReconciler,
})

if !c.SingleNode {
if controllerMode != config.SingleNodeMode {
nodeComponents.Add(ctx, &controller.K0sControllersLeaseCounter{
InvocationID: c.K0sVars.InvocationID,
ClusterConfig: nodeConfig,
Expand All @@ -308,7 +314,7 @@ func (c *command) start(ctx context.Context) error {
}

// One leader elector per controller
if !c.SingleNode {
if controllerMode != config.SingleNodeMode {
// The name used to be hardcoded in the component itself
// At some point we need to rename this.
leaderElector = leaderelector.NewLeasePool(c.K0sVars.InvocationID, adminClientFactory, "k0s-endpoint-reconciler")
Expand All @@ -317,7 +323,7 @@ func (c *command) start(ctx context.Context) error {
}
nodeComponents.Add(ctx, leaderElector)

if !slices.Contains(c.DisableComponents, constant.ApplierManagerComponentName) {
if !slices.Contains(flags.DisableComponents, constant.ApplierManagerComponentName) {
nodeComponents.Add(ctx, &applier.Manager{
K0sVars: c.K0sVars,
KubeClientFactory: adminClientFactory,
Expand All @@ -328,26 +334,26 @@ func (c *command) start(ctx context.Context) error {
})
}

if !c.SingleNode && !slices.Contains(c.DisableComponents, constant.ControlAPIComponentName) {
if controllerMode != config.SingleNodeMode && !slices.Contains(flags.DisableComponents, constant.ControlAPIComponentName) {
nodeComponents.Add(ctx, &controller.K0SControlAPI{
ConfigPath: c.CfgFile,
K0sVars: c.K0sVars,
})
}

if !slices.Contains(c.DisableComponents, constant.CsrApproverComponentName) {
if !slices.Contains(flags.DisableComponents, constant.CsrApproverComponentName) {
nodeComponents.Add(ctx, controller.NewCSRApprover(nodeConfig,
leaderElector,
adminClientFactory))
}

if c.EnableK0sCloudProvider {
if flags.EnableK0sCloudProvider {
nodeComponents.Add(
ctx,
controller.NewK0sCloudProvider(
c.K0sVars.AdminKubeConfigPath,
c.K0sCloudProviderUpdateFrequency,
c.K0sCloudProviderPort,
flags.K0sCloudProviderUpdateFrequency,
flags.K0sCloudProviderPort,
),
)
}
Expand All @@ -358,8 +364,8 @@ func (c *command) start(ctx context.Context) error {
Role: "controller",
Args: os.Args,
Version: build.Version,
Workloads: c.SingleNode || c.EnableWorker,
SingleNode: c.SingleNode,
Workloads: controllerMode.WorkloadsEnabled(),
SingleNode: controllerMode == config.SingleNodeMode,
K0sVars: c.K0sVars,
ClusterConfig: nodeConfig,
},
Expand Down Expand Up @@ -416,7 +422,7 @@ func (c *command) start(ctx context.Context) error {

var configSource clusterconfig.ConfigSource
// For backwards compatibility, use file as config source by default
if c.EnableDynamicConfig {
if flags.EnableDynamicConfig {
clusterComponents.Add(ctx, controller.NewClusterConfigInitializer(
adminClientFactory,
leaderElector,
Expand All @@ -437,7 +443,7 @@ func (c *command) start(ctx context.Context) error {
configSource,
))

if !slices.Contains(c.DisableComponents, constant.HelmComponentName) {
if !slices.Contains(flags.DisableComponents, constant.HelmComponentName) {
helmSaver, err := controller.NewManifestsSaver("helm", c.K0sVars.DataDir)
if err != nil {
return fmt.Errorf("failed to initialize helm manifests saver: %w", err)
Expand All @@ -450,7 +456,7 @@ func (c *command) start(ctx context.Context) error {
))
}

if !slices.Contains(c.DisableComponents, constant.AutopilotComponentName) {
if !slices.Contains(flags.DisableComponents, constant.AutopilotComponentName) {
logrus.Debug("starting manifest saver")
manifestsSaver, err := controller.NewManifestsSaver("autopilot", c.K0sVars.DataDir)
if err != nil {
Expand All @@ -469,19 +475,19 @@ func (c *command) start(ctx context.Context) error {
))
}

if !slices.Contains(c.DisableComponents, constant.KubeProxyComponentName) {
if !slices.Contains(flags.DisableComponents, constant.KubeProxyComponentName) {
clusterComponents.Add(ctx, controller.NewKubeProxy(c.K0sVars, nodeConfig))
}

if !slices.Contains(c.DisableComponents, constant.CoreDNSComponentname) {
if !slices.Contains(flags.DisableComponents, constant.CoreDNSComponentname) {
coreDNS, err := controller.NewCoreDNS(c.K0sVars, adminClientFactory, nodeConfig)
if err != nil {
return fmt.Errorf("failed to create CoreDNS reconciler: %w", err)
}
clusterComponents.Add(ctx, coreDNS)
}

if !slices.Contains(c.DisableComponents, constant.NetworkProviderComponentName) {
if !slices.Contains(flags.DisableComponents, constant.NetworkProviderComponentName) {
logrus.Infof("Creating network reconcilers")

calicoSaver, err := controller.NewManifestsSaver("calico", c.K0sVars.DataDir)
Expand All @@ -497,7 +503,7 @@ func (c *command) start(ctx context.Context) error {
return fmt.Errorf("failed to create windows manifests saver: %w", err)
}
clusterComponents.Add(ctx, controller.NewCalico(c.K0sVars, calicoInitSaver, calicoSaver))
if !slices.Contains(c.DisableComponents, constant.WindowsNodeComponentName) {
if !slices.Contains(flags.DisableComponents, constant.WindowsNodeComponentName) {
clusterComponents.Add(ctx, controller.NewWindowsStackComponent(c.K0sVars, adminClientFactory, windowsStackSaver))
}
kubeRouterSaver, err := controller.NewManifestsSaver("kuberouter", c.K0sVars.DataDir)
Expand All @@ -507,11 +513,11 @@ func (c *command) start(ctx context.Context) error {
clusterComponents.Add(ctx, controller.NewKubeRouter(c.K0sVars, kubeRouterSaver))
}

if !slices.Contains(c.DisableComponents, constant.MetricsServerComponentName) {
if !slices.Contains(flags.DisableComponents, constant.MetricsServerComponentName) {
clusterComponents.Add(ctx, controller.NewMetricServer(c.K0sVars, adminClientFactory))
}

if c.EnableMetricsScraper {
if flags.EnableMetricsScraper {
metricsSaver, err := controller.NewManifestsSaver("metrics", c.K0sVars.DataDir)
if err != nil {
return fmt.Errorf("failed to create metrics manifests saver: %w", err)
Expand All @@ -523,7 +529,7 @@ func (c *command) start(ctx context.Context) error {
clusterComponents.Add(ctx, metrics)
}

if !slices.Contains(c.DisableComponents, constant.WorkerConfigComponentName) {
if !slices.Contains(flags.DisableComponents, constant.WorkerConfigComponentName) {
// Create new dedicated leasepool for worker config reconciler
leaseName := fmt.Sprintf("k0s-%s-%s", constant.WorkerConfigComponentName, constant.KubernetesMajorMinorVersion)
workerConfigLeasePool := leaderelector.NewLeasePool(c.K0sVars.InvocationID, adminClientFactory, leaseName)
Expand All @@ -536,11 +542,11 @@ func (c *command) start(ctx context.Context) error {
clusterComponents.Add(ctx, reconciler)
}

if !slices.Contains(c.DisableComponents, constant.SystemRbacComponentName) {
if !slices.Contains(flags.DisableComponents, constant.SystemRbacComponentName) {
clusterComponents.Add(ctx, controller.NewSystemRBAC(c.K0sVars.ManifestsDir))
}

if !slices.Contains(c.DisableComponents, constant.NodeRoleComponentName) {
if !slices.Contains(flags.DisableComponents, constant.NodeRoleComponentName) {
clusterComponents.Add(ctx, controller.NewNodeRole(c.K0sVars, adminClientFactory))
}

Expand All @@ -553,21 +559,21 @@ func (c *command) start(ctx context.Context) error {
})
}

if !slices.Contains(c.DisableComponents, constant.KubeSchedulerComponentName) {
if !slices.Contains(flags.DisableComponents, constant.KubeSchedulerComponentName) {
clusterComponents.Add(ctx, &controller.Scheduler{
LogLevel: c.LogLevels.KubeScheduler,
K0sVars: c.K0sVars,
SingleNode: c.SingleNode,
SingleNode: controllerMode == config.SingleNodeMode,
})
}

if !slices.Contains(c.DisableComponents, constant.KubeControllerManagerComponentName) {
if !slices.Contains(flags.DisableComponents, constant.KubeControllerManagerComponentName) {
clusterComponents.Add(ctx, &controller.Manager{
LogLevel: c.LogLevels.KubeControllerManager,
K0sVars: c.K0sVars,
SingleNode: c.SingleNode,
SingleNode: controllerMode == config.SingleNodeMode,
ServiceClusterIPRange: nodeConfig.Spec.Network.BuildServiceCIDR(nodeConfig.Spec.API.Address),
ExtraArgs: c.KubeControllerManagerExtraArgs,
ExtraArgs: flags.KubeControllerManagerExtraArgs,
})
}

Expand All @@ -585,7 +591,7 @@ func (c *command) start(ctx context.Context) error {
K0sVars: c.K0sVars,
KubeletExtraArgs: c.KubeletExtraArgs,
AdminClientFactory: adminClientFactory,
EnableWorker: c.EnableWorker,
Workloads: controllerMode.WorkloadsEnabled(),
})

restConfig, err := adminClientFactory.GetRESTConfig()
Expand Down Expand Up @@ -623,9 +629,9 @@ func (c *command) start(ctx context.Context) error {
}
}()

if c.EnableWorker {
if controllerMode.WorkloadsEnabled() {
perfTimer.Checkpoint("starting-worker")
if err := c.startWorker(ctx, c.WorkerProfile, nodeConfig); err != nil {
if err := c.startWorker(ctx, flags, nodeConfig); err != nil {
logrus.WithError(err).Error("Failed to start controller worker")
} else {
perfTimer.Checkpoint("started-worker")
Expand All @@ -644,7 +650,7 @@ func (c *command) start(ctx context.Context) error {
return nil
}

func (c *command) startWorker(ctx context.Context, profile string, nodeConfig *v1beta1.ClusterConfig) error {
func (c *command) startWorker(ctx context.Context, opts *config.ControllerOptions, nodeConfig *v1beta1.ClusterConfig) error {
var bootstrapConfig string
if !file.Exists(c.K0sVars.KubeletAuthConfigPath) {
// wait for controller to start up
Expand Down Expand Up @@ -679,15 +685,20 @@ func (c *command) startWorker(ctx context.Context, profile string, nodeConfig *v
// possibly other args won't get messed up.
wc := workercmd.Command(*(*config.CLIOptions)(c))
wc.TokenArg = bootstrapConfig
wc.WorkerProfile = profile
wc.Labels = append(wc.Labels, fields.OneTermEqualSelector(constant.K0SNodeRoleLabel, "control-plane").String())
wc.DisableIPTables = true
if !c.SingleNode && !c.NoTaints {
if opts.Mode() == config.ControllerPlusWorkerMode && !opts.NoTaints {
key := path.Join(constant.NodeRoleLabelNamespace, "master")
taint := fields.OneTermEqualSelector(key, ":NoSchedule")
wc.Taints = append(wc.Taints, taint.String())
}
return wc.Start(ctx)
return wc.Start(ctx, (*embeddingController)(opts))
}

type embeddingController config.ControllerOptions

// IsSingleNode implements [workercmd.EmbeddingController].
func (c *embeddingController) IsSingleNode() bool {
return (*config.ControllerOptions)(c).Mode() == config.SingleNodeMode
}

// If we've got an etcd data directory in place for embedded etcd, or a ca for
Expand Down
3 changes: 2 additions & 1 deletion cmd/install/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ With the controller subcommand you can setup a single node cluster by running:

flags := cmd.Flags()
flags.AddFlagSet(config.GetPersistentFlagSet())
flags.AddFlagSet(config.GetControllerFlags())
flags.AddFlagSet(config.GetControllerFlags(&config.ControllerOptions{}))
flags.AddFlagSet(config.GetWorkerFlags())
flags.AddFlagSet(config.FileInputFlag())

return cmd
}
15 changes: 10 additions & 5 deletions cmd/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ import (

type Command config.CLIOptions

// Interface between an embedded worker and its embedding controller.
type EmbeddingController interface {
IsSingleNode() bool
}

func NewWorkerCmd() *cobra.Command {
var ignorePreFlightChecks bool

Expand Down Expand Up @@ -89,7 +94,7 @@ func NewWorkerCmd() *cobra.Command {
ctx, cancel := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
defer cancel()

return c.Start(ctx)
return c.Start(ctx, nil)
},
}

Expand All @@ -102,7 +107,7 @@ func NewWorkerCmd() *cobra.Command {
}

// Start starts the worker components based on the given [config.CLIOptions].
func (c *Command) Start(ctx context.Context) error {
func (c *Command) Start(ctx context.Context, controller EmbeddingController) error {
if err := worker.BootstrapKubeletKubeconfig(ctx, c.K0sVars, &c.WorkerOptions); err != nil {
return err
}
Expand All @@ -123,7 +128,7 @@ func (c *Command) Start(ctx context.Context) error {
var staticPods worker.StaticPods

if workerConfig.NodeLocalLoadBalancing.IsEnabled() {
if c.SingleNode {
if controller != nil && controller.IsSingleNode() {
return errors.New("node-local load balancing cannot be used in a single-node cluster")
}

Expand All @@ -148,7 +153,7 @@ func (c *Command) Start(ctx context.Context) error {
c.WorkerProfile = "default-windows"
}

if !c.DisableIPTables {
if controller == nil {
componentManager.Add(ctx, &iptables.Component{
IPTablesMode: c.WorkerOptions.IPTablesMode,
BinDir: c.K0sVars.BinDir,
Expand All @@ -171,7 +176,7 @@ func (c *Command) Start(ctx context.Context) error {

certManager := worker.NewCertificateManager(kubeletKubeconfigPath)

addPlatformSpecificComponents(ctx, componentManager, c.K0sVars, &c.ControllerOptions, certManager)
addPlatformSpecificComponents(ctx, componentManager, c.K0sVars, controller, certManager)

// extract needed components
if err := componentManager.Init(ctx); err != nil {
Expand Down
Loading

0 comments on commit ebdd6f4

Please sign in to comment.