Skip to content
This repository was archived by the owner on Nov 1, 2022. It is now read-only.

Return sync errors in ListServices #1410

Merged
merged 3 commits into from
Oct 26, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Skip multi sync if previously failed
`fluxsync.Sync()` applies everything as a multidoc by default. If any
apply fails, the next `Sync()` will apply the failed ones one by one
while the rest is still applied as a multidoc.
rndstr committed Oct 25, 2018
commit 936d12134ecf686ca995aaa2de850b3472efea28
2 changes: 1 addition & 1 deletion cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ type Cluster interface {
SomeControllers([]flux.ResourceID) ([]Controller, error)
Ping() error
Export() ([]byte, error)
Sync(SyncDef) error
Sync(SyncDef, map[flux.ResourceID]error) error
PublicSSHKey(regenerate bool) (ssh.PublicKey, error)
}

4 changes: 2 additions & 2 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
@@ -191,7 +191,7 @@ func (c *Cluster) AllControllers(namespace string) (res []cluster.Controller, er

// Sync performs the given actions on resources. Operations are
// asynchronous, but serialised.
func (c *Cluster) Sync(spec cluster.SyncDef) error {
func (c *Cluster) Sync(spec cluster.SyncDef, errored map[flux.ResourceID]error) error {
logger := log.With(c.logger, "method", "Sync")

cs := makeChangeSet()
@@ -221,7 +221,7 @@ func (c *Cluster) Sync(spec cluster.SyncDef) error {

c.mu.Lock()
defer c.mu.Unlock()
if applyErrs := c.applier.apply(logger, cs); len(applyErrs) > 0 {
if applyErrs := c.applier.apply(logger, cs, errored); len(applyErrs) > 0 {
errs = append(errs, applyErrs...)
}

32 changes: 26 additions & 6 deletions cluster/kubernetes/sync.go
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
)

@@ -30,7 +31,7 @@ func (c *changeSet) stage(cmd string, o *apiObject) {

// Applier is something that will apply a changeset to the cluster.
type Applier interface {
apply(log.Logger, changeSet) cluster.SyncError
apply(log.Logger, changeSet, map[flux.ResourceID]error) cluster.SyncError
}

type Kubectl struct {
@@ -113,21 +114,40 @@ func (objs applyOrder) Less(i, j int) bool {
return ranki < rankj
}

func (c *Kubectl) apply(logger log.Logger, cs changeSet) (errs cluster.SyncError) {
func (c *Kubectl) apply(logger log.Logger, cs changeSet, errored map[flux.ResourceID]error) (errs cluster.SyncError) {
f := func(objs []*apiObject, cmd string, args ...string) {
if len(objs) == 0 {
return
}
logger.Log("cmd", cmd, "args", strings.Join(args, " "), "count", len(objs))
args = append(args, cmd)
if err := c.doCommand(logger, makeMultidoc(objs), args...); err != nil {

var multi, single []*apiObject
if len(errored) == 0 {
multi = objs
} else {
for _, obj := range objs {
r := bytes.NewReader(obj.Bytes())
if err := c.doCommand(logger, r, args...); err != nil {
errs = append(errs, cluster.ResourceError{obj.Resource, err})
if _, ok := errored[obj.ResourceID()]; ok {
// Resources that errored before shall be applied separately
single = append(single, obj)
} else {
// everything else will be tried in a multidoc apply.
multi = append(multi, obj)
}
}
}

if len(multi) > 0 {
if err := c.doCommand(logger, makeMultidoc(multi), args...); err != nil {
single = append(single, multi...)
}
}
for _, obj := range single {
r := bytes.NewReader(obj.Bytes())
if err := c.doCommand(logger, r, args...); err != nil {
errs = append(errs, cluster.ResourceError{obj.Resource, err})
}
}
}

// When deleting objects, the only real concern is that we don't
6 changes: 3 additions & 3 deletions cluster/kubernetes/sync_test.go
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ type mockApplier struct {
commandRun bool
}

func (m *mockApplier) apply(_ log.Logger, c changeSet) cluster.SyncError {
func (m *mockApplier) apply(_ log.Logger, c changeSet, errored map[flux.ResourceID]error) cluster.SyncError {
if len(c.objs) != 0 {
m.commandRun = true
}
@@ -56,7 +56,7 @@ func setup(t *testing.T) (*Cluster, *mockApplier) {

func TestSyncNop(t *testing.T) {
kube, mock := setup(t)
if err := kube.Sync(cluster.SyncDef{}); err != nil {
if err := kube.Sync(cluster.SyncDef{}, nil); err != nil {
t.Errorf("%#v", err)
}
if mock.commandRun {
@@ -72,7 +72,7 @@ func TestSyncMalformed(t *testing.T) {
Apply: rsc{"id", []byte("garbage")},
},
},
})
}, nil)
if err == nil {
t.Error("expected error because malformed resource def, but got nil")
}
2 changes: 1 addition & 1 deletion cluster/mock.go
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ func (m *Mock) Export() ([]byte, error) {
return m.ExportFunc()
}

func (m *Mock) Sync(c SyncDef) error {
func (m *Mock) Sync(c SyncDef, errored map[flux.ResourceID]error) error {
return m.SyncFunc(c)
}

4 changes: 2 additions & 2 deletions daemon/loop.go
Original file line number Diff line number Diff line change
@@ -194,7 +194,7 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
syncErrors := make(map[flux.ResourceID]error)
var resourceErrors []event.ResourceError
// TODO supply deletes argument from somewhere (command-line?)
if err := fluxsync.Sync(logger, d.Manifests, allResources, d.Cluster, false); err != nil {
if err := fluxsync.Sync(logger, d.Manifests, allResources, d.Cluster, false, d.syncErrors.errs); err != nil {
logger.Log("err", err)
switch syncerr := err.(type) {
case cluster.SyncError:
@@ -212,7 +212,7 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
}
// Since fluxsync.Sync() applies *all* resources we replace
// all errors here. If there is a recurring issue it will
// show up every time when they sync is run.
// show up every time when the sync is run.
d.syncErrors.mu.Lock()
d.syncErrors.errs = syncErrors
d.syncErrors.mu.Unlock()
6 changes: 4 additions & 2 deletions sync/sync.go
Original file line number Diff line number Diff line change
@@ -4,13 +4,15 @@ import (
"github.com/go-kit/kit/log"
"github.com/pkg/errors"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/policy"
"github.com/weaveworks/flux/resource"
)

// Sync synchronises the cluster to the files in a directory
func Sync(logger log.Logger, m cluster.Manifests, repoResources map[string]resource.Resource, clus cluster.Cluster, deletes bool) error {
func Sync(logger log.Logger, m cluster.Manifests, repoResources map[string]resource.Resource, clus cluster.Cluster,
deletes bool, errored map[flux.ResourceID]error) error {
// Get a map of resources defined in the cluster
clusterBytes, err := clus.Export()

@@ -42,7 +44,7 @@ func Sync(logger log.Logger, m cluster.Manifests, repoResources map[string]resou
prepareSyncApply(logger, clusterResources, id, res, &sync)
}

return clus.Sync(sync)
return clus.Sync(sync, errored)
}

func prepareSyncDelete(logger log.Logger, repoResources map[string]resource.Resource, id string, res resource.Resource, sync *cluster.SyncDef) {
7 changes: 4 additions & 3 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ import (
// "github.com/weaveworks/flux"
"context"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/cluster/kubernetes"
"github.com/weaveworks/flux/cluster/kubernetes/testfiles"
@@ -40,7 +41,7 @@ func TestSync(t *testing.T) {
t.Fatal(err)
}

if err := Sync(log.NewNopLogger(), manifests, resources, clus, true); err != nil {
if err := Sync(log.NewNopLogger(), manifests, resources, clus, true, nil); err != nil {
t.Fatal(err)
}
checkClusterMatchesFiles(t, manifests, clus, checkout.Dir(), dirs)
@@ -60,7 +61,7 @@ func TestSync(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := Sync(log.NewNopLogger(), manifests, resources, clus, true); err != nil {
if err := Sync(log.NewNopLogger(), manifests, resources, clus, true, nil); err != nil {
t.Fatal(err)
}
checkClusterMatchesFiles(t, manifests, clus, checkout.Dir(), dirs)
@@ -200,7 +201,7 @@ type syncCluster struct {
resources map[string][]byte
}

func (p *syncCluster) Sync(def cluster.SyncDef) error {
func (p *syncCluster) Sync(def cluster.SyncDef, errored map[flux.ResourceID]error) error {
println("=== Syncing ===")
for _, action := range def.Actions {
if action.Delete != nil {