Skip to content

Commit

Permalink
refactor: all network merge controllers
Browse files Browse the repository at this point in the history
They had lots of common (and broken) code, so use generics to implement
the core logic once, and use that in all kinds of merge controllers.

This removes lots of code duplication.

Fixes flaky unit-tests, and also improves controller performance on
conflicts.

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
  • Loading branch information
smira committed Jan 27, 2025
1 parent ec8c466 commit 807a3cd
Show file tree
Hide file tree
Showing 16 changed files with 328 additions and 793 deletions.
139 changes: 21 additions & 118 deletions internal/app/machined/pkg/controllers/network/address_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,138 +2,41 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

// Package network provides controllers which manage network resources.
//
//nolint:dupl
package network

import (
"context"
"fmt"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"go.uber.org/zap"

"github.com/siderolabs/talos/pkg/machinery/resources/network"
)

// AddressMergeController merges network.AddressSpec in network.ConfigNamespace and produces final network.AddressSpec in network.Namespace.
type AddressMergeController struct{}

// Name implements controller.Controller interface.
func (ctrl *AddressMergeController) Name() string {
return "network.AddressMergeController"
}

// Inputs implements controller.Controller interface.
func (ctrl *AddressMergeController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: network.ConfigNamespaceName,
Type: network.AddressSpecType,
Kind: controller.InputWeak,
},
{
Namespace: network.NamespaceName,
Type: network.AddressSpecType,
Kind: controller.InputDestroyReady,
},
}
}

// Outputs implements controller.Controller interface.
func (ctrl *AddressMergeController) Outputs() []controller.Output {
return []controller.Output{
{
Type: network.AddressSpecType,
Kind: controller.OutputShared,
},
}
}

// Run implements controller.Controller interface.
// NewAddressMergeController initializes a AddressMergeController.
//
//nolint:gocyclo
func (ctrl *AddressMergeController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
}

// list source network configuration resources
list, err := r.List(ctx, resource.NewMetadata(network.ConfigNamespaceName, network.AddressSpecType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing source network addresses: %w", err)
}

// address is allowed as long as it's not duplicate, for duplicate higher layer takes precedence
addresses := map[string]*network.AddressSpec{}

for _, res := range list.Items {
address := res.(*network.AddressSpec) //nolint:forcetypeassert
id := network.AddressID(address.TypedSpec().LinkName, address.TypedSpec().Address)

existing, ok := addresses[id]
if ok && existing.TypedSpec().ConfigLayer > address.TypedSpec().ConfigLayer {
// skip this address, as existing one is higher layer
continue
}

addresses[id] = address
}

conflictsDetected := 0

for id, address := range addresses {
if err = safe.WriterModify(ctx, r, network.NewAddressSpec(network.NamespaceName, id), func(addr *network.AddressSpec) error {
*addr.TypedSpec() = *address.TypedSpec()

return nil
}); err != nil {
if state.IsPhaseConflictError(err) {
// phase conflict, resource is being torn down, skip updating it and trigger reconcile
// later by failing the
conflictsDetected++

delete(addresses, id)
} else {
return fmt.Errorf("error updating resource: %w", err)
}
}
}

// list addresses for cleanup
list, err = r.List(ctx, resource.NewMetadata(network.NamespaceName, network.AddressSpecType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing resources: %w", err)
}

for _, res := range list.Items {
if _, ok := addresses[res.Metadata().ID()]; !ok {
var okToDestroy bool

okToDestroy, err = r.Teardown(ctx, res.Metadata())
if err != nil {
return fmt.Errorf("error cleaning up addresses: %w", err)
// AddressMergeController merges network.AddressSpec in network.ConfigNamespace and produces final network.AddressSpec in network.Namespace.
func NewAddressMergeController() controller.Controller {
return GenericMergeController(
network.ConfigNamespaceName,
network.NamespaceName,
func(logger *zap.Logger, list safe.List[*network.AddressSpec]) map[resource.ID]*network.AddressSpecSpec {
// address is allowed as long as it's not duplicate, for duplicate higher layer takes precedence
addresses := map[resource.ID]*network.AddressSpecSpec{}

for address := range list.All() {
id := network.AddressID(address.TypedSpec().LinkName, address.TypedSpec().Address)

existing, ok := addresses[id]
if ok && existing.ConfigLayer > address.TypedSpec().ConfigLayer {
// skip this address, as existing one is higher layer
continue
}

if okToDestroy {
if err = r.Destroy(ctx, res.Metadata()); err != nil {
return fmt.Errorf("error cleaning up addresses: %w", err)
}
}
addresses[id] = address.TypedSpec()
}
}

if conflictsDetected > 0 {
return fmt.Errorf("%d conflict(s) detected", conflictsDetected)
}

r.ResetRestartBackoff()
}
return addresses
},
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (suite *AddressMergeSuite) SetupTest() {
suite.runtime, err = runtime.NewRuntime(suite.state, zaptest.NewLogger(suite.T()))
suite.Require().NoError(err)

suite.Require().NoError(suite.runtime.RegisterController(&netctrl.AddressMergeController{}))
suite.Require().NoError(suite.runtime.RegisterController(netctrl.NewAddressMergeController()))

suite.startRuntime()
}
Expand Down Expand Up @@ -152,7 +152,7 @@ func (suite *AddressMergeSuite) TestMerge() {
suite.assertAddresses(
[]string{
"lo/127.0.0.1/8",
"eth0/10.0.0.35/32",
"eth0/10.0.0.1/8",
}, func(*network.AddressSpec, *assert.Assertions) {},
)
suite.Assert().NoError(
Expand Down
142 changes: 142 additions & 0 deletions internal/app/machined/pkg/controllers/network/generic_merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package network

import (
"context"
"fmt"
"strings"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/resource/typed"
"github.com/cosi-project/runtime/pkg/safe"
"go.uber.org/zap"
)

type genericMergeFunc[T typed.DeepCopyable[T], E typed.Extension, R typed.Resource[T, E]] func(logger *zap.Logger, in safe.List[*R]) map[resource.ID]*T

// GenericMergeController initializes a generic merge controller for network resources.
func GenericMergeController[T typed.DeepCopyable[T], E typed.Extension](namespaceIn, namespaceOut resource.Namespace, mergeFunc genericMergeFunc[T, E, typed.Resource[T, E]]) controller.Controller {
var zeroE E

controllerName := strings.ReplaceAll(zeroE.ResourceDefinition().Type, "Spec", "MergeController")

return &genericMergeController[T, E]{
controllerName: controllerName,
resourceType: zeroE.ResourceDefinition().Type,
namespaceIn: namespaceIn,
namespaceOut: namespaceOut,
mergeFunc: mergeFunc,
}
}

type genericMergeController[T typed.DeepCopyable[T], E typed.Extension] struct {
controllerName string
resourceType resource.Type
namespaceIn resource.Namespace
namespaceOut resource.Namespace
mergeFunc genericMergeFunc[T, E, typed.Resource[T, E]]
}

func (ctrl *genericMergeController[T, E]) Name() string {
return ctrl.controllerName
}

func (ctrl *genericMergeController[T, E]) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: ctrl.namespaceIn,
Type: ctrl.resourceType,
Kind: controller.InputWeak,
},
{
Namespace: ctrl.namespaceOut,
Type: ctrl.resourceType,
Kind: controller.InputDestroyReady,
},
}
}

func (ctrl *genericMergeController[T, E]) Outputs() []controller.Output {
return []controller.Output{
{
Type: ctrl.resourceType,
Kind: controller.OutputShared,
},
}
}

//nolint:gocyclo
func (ctrl *genericMergeController[T, E]) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
}

type R = typed.Resource[T, E]

// list source network configuration resources
in, err := safe.ReaderList[*R](ctx, r, resource.NewMetadata(ctrl.namespaceIn, ctrl.resourceType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing source network resources: %w", err)
}

merged := ctrl.mergeFunc(logger, in)

// cleanup resources, detecting conflicts on the way
out, err := safe.ReaderList[*R](ctx, r, resource.NewMetadata(ctrl.namespaceOut, ctrl.resourceType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing output resources: %w", err)
}

for res := range out.All() {
shouldBeDestroyed := false
if _, ok := merged[res.Metadata().ID()]; !ok {
shouldBeDestroyed = true
}

isTearingDown := res.Metadata().Phase() == resource.PhaseTearingDown

if shouldBeDestroyed || isTearingDown {
var okToDestroy bool

okToDestroy, err = r.Teardown(ctx, res.Metadata())
if err != nil {
return fmt.Errorf("error cleaning up addresses: %w", err)
}

if okToDestroy {
if err = r.Destroy(ctx, res.Metadata()); err != nil {
return fmt.Errorf("error cleaning up addresses: %w", err)
}
} else if !shouldBeDestroyed {
// resource is not ready to be destroyed yet, skip it
delete(merged, res.Metadata().ID())
}
}
}

var zeroT T

for id, spec := range merged {
if err = safe.WriterModify(ctx, r,
typed.NewResource[T, E](resource.NewMetadata(ctrl.namespaceOut, ctrl.resourceType, id, resource.VersionUndefined), zeroT),
func(r *R) error {
*r.TypedSpec() = *spec

return nil
}); err != nil {
return fmt.Errorf("error updating resource: %w", err)
}

logger.Debug("merged spec", zap.String("id", id), zap.Any("spec", spec))
}

r.ResetRestartBackoff()
}
}
Loading

0 comments on commit 807a3cd

Please sign in to comment.