Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve sync #1102

Merged
merged 11 commits into from
Oct 6, 2018
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 52 additions & 36 deletions pkg/skaffold/kubernetes/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ limitations under the License.
package kubernetes

import (
"context"
"fmt"
"os/exec"
"strings"
"sync/atomic"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/sync"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
"github.com/sirupsen/logrus"
Expand All @@ -34,60 +34,76 @@ import (

type KubectlSyncer struct{}

func (k *KubectlSyncer) Sync(s *sync.Item) error {
if err := perform(s.Image, s.Copy, copyFileFn); err != nil {
return errors.Wrap(err, "copying files")
func (k *KubectlSyncer) Sync(ctx context.Context, s *sync.Item) error {
if len(s.Copy) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe move the len check into perform? a few lines of code saved

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that, yes.

logrus.Infoln("Copying files:", s.Copy, "to", s.Image)

if err := perform(ctx, s.Image, s.Copy, copyFileFn); err != nil {
return errors.Wrap(err, "copying files")
}
}
if err := perform(s.Image, s.Delete, deleteFileFn); err != nil {
return errors.Wrap(err, "deleting files")

if len(s.Delete) > 0 {
logrus.Infoln("Deleting files:", s.Delete, "from", s.Image)

if err := perform(ctx, s.Image, s.Delete, deleteFileFn); err != nil {
return errors.Wrap(err, "deleting files")
}
}
return nil
}

func deleteFileFn(pod v1.Pod, container v1.Container, src, dst string) *exec.Cmd {
return exec.Command("kubectl", "exec", pod.Name, "--namespace", pod.Namespace, "-c", container.Name, "--", "rm", "-rf", dst)
return nil
}

func copyFileFn(pod v1.Pod, container v1.Container, src, dst string) *exec.Cmd {
return exec.Command("kubectl", "cp", src, fmt.Sprintf("%s/%s:%s", pod.Namespace, pod.Name, dst), "-c", container.Name)
func deleteFileFn(ctx context.Context, pod v1.Pod, container v1.Container, src, dst string) *exec.Cmd {
return exec.CommandContext(ctx, "kubectl", "exec", pod.Name, "--namespace", pod.Namespace, "-c", container.Name, "--", "rm", "-rf", dst)
}

func labelSelector() string {
var reqs []string
for k, v := range constants.Labels.DefaultLabels {
reqs = append(reqs, fmt.Sprintf("%s=%s", k, v))
}
return strings.Join(reqs, ",")
func copyFileFn(ctx context.Context, pod v1.Pod, container v1.Container, src, dst string) *exec.Cmd {
return exec.CommandContext(ctx, "kubectl", "cp", src, fmt.Sprintf("%s/%s:%s", pod.Namespace, pod.Name, dst), "-c", container.Name)
}

func perform(image string, files map[string]string, cmdFn func(v1.Pod, v1.Container, string, string) *exec.Cmd) error {
logrus.Info("Syncing files:", files)
func perform(ctx context.Context, image string, files map[string]string, cmdFn func(context.Context, v1.Pod, v1.Container, string, string) *exec.Cmd) error {
client, err := Client()
if err != nil {
return errors.Wrap(err, "getting k8s client")
}
pods, err := client.CoreV1().Pods("").List(meta_v1.ListOptions{
LabelSelector: labelSelector(),
})

pods, err := client.CoreV1().Pods("").List(meta_v1.ListOptions{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the skaffold labelselector here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed it because it wasn't working. What I saw is that when I use a deployment, this deployment gets the labels but not the pods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good point 👍

if err != nil {
return errors.Wrap(err, "getting pods")
}

var performed int32
var e errgroup.Group

for _, p := range pods.Items {
for _, c := range p.Spec.Containers {
if c.Image == image {
var e errgroup.Group
for src, dst := range files {
src, dst := src, dst
e.Go(func() error {
cmd := cmdFn(p, c, src, dst)
return util.RunCmd(cmd)
})
}
if err := e.Wait(); err != nil {
return errors.Wrap(err, "syncing files")
}
if c.Image != image {
continue
}

for src, dst := range files {
cmd := cmdFn(ctx, p, c, src, dst)

e.Go(func() error {
if err := util.RunCmd(cmd); err != nil {
return err
}

atomic.AddInt32(&performed, 1)
return nil
})
}
}
}

if err := e.Wait(); err != nil {
return errors.Wrap(err, "syncing files")
}

if int(performed) != len(files) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to count here. errgroup handles this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check is to make sure that we try to perform the sync for n files. It has nothing to do with errors. For example, with the code that filters on skaffold labels, it always syncs zero files without giving an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I guess this is a bit tricky. What if we have 2 replicas?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'll see how to fix it. Each file should be synced at least once, I guess.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think thats the best we can do. But making sure we sync at least once is a good check.

return errors.New("couldn't sync all the files")
}

return nil
}
12 changes: 8 additions & 4 deletions pkg/skaffold/kubernetes/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package kubernetes

import (
"context"
"fmt"
"os/exec"
"strings"
Expand Down Expand Up @@ -49,8 +50,8 @@ func (t *TestCmdRecorder) RunCmdOut(cmd *exec.Cmd) ([]byte, error) {
return nil, t.RunCmd(cmd)
}

func fakeCmd(p v1.Pod, c v1.Container, src, dst string) *exec.Cmd {
return exec.Command("copy", src, dst)
func fakeCmd(ctx context.Context, p v1.Pod, c v1.Container, src, dst string) *exec.Cmd {
return exec.CommandContext(ctx, "copy", src, dst)
}

var pod = &v1.Pod{
Expand All @@ -76,7 +77,7 @@ func TestPerform(t *testing.T) {
description string
image string
files map[string]string
cmdFn func(v1.Pod, v1.Container, string, string) *exec.Cmd
cmdFn func(context.Context, v1.Pod, v1.Container, string, string) *exec.Cmd
cmdErr error
clientErr error
expected []string
Expand Down Expand Up @@ -110,6 +111,7 @@ func TestPerform(t *testing.T) {
image: "gcr.io/different-pod:123",
files: map[string]string{"test.go": "/test.go"},
cmdFn: fakeCmd,
shouldErr: true,
},
}

Expand All @@ -125,7 +127,9 @@ func TestPerform(t *testing.T) {
}

util.DefaultExecCommand = cmdRecord
err := perform(test.image, test.files, test.cmdFn)

err := perform(context.Background(), test.image, test.files, test.cmdFn)

testutil.CheckErrorAndDeepEqual(t, test.shouldErr, err, test.expected, cmdRecord.cmds)
})
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/skaffold/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/build/kaniko"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/build/local"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/build/tag"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/color"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/config"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/deploy"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/docker"
Expand Down Expand Up @@ -259,8 +260,7 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la
}
if s != nil {
changed.AddResync(s)
}
if s == nil {
} else {
changed.AddRebuild(a.artifact)
}
}
Expand All @@ -271,12 +271,12 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la
return ErrorConfigurationChanged
case len(changed.needsResync) > 0:
for _, s := range changed.needsResync {
if err := r.Syncer.Sync(s); err != nil {
color.Default.Fprintf(out, "Syncing %d files for %s\n", len(s.Copy)+len(s.Delete), s.Image)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update to latest code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean I'm rebased on old code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah just maybe the last commit. I changed this message to the debug and info statement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, the one I asked you to add? It didn't work well in reality :-) Didn't give enough feedback

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) SGTM


if err := r.Syncer.Sync(ctx, s); err != nil {
logrus.Warnln("Skipping build and deploy due to sync error:", err)
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to add this but if we get an error here, we should add that dirty artifact back to the rebuild list

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would you do that?

}
logrus.Infof("Synced %d files for %s", len(s.Copy)+len(s.Delete), s.Image)
logrus.Debugf("Synced files for %s...\nCopied: %s\nDeleted: %s\n", s.Image, s.Copy, s.Delete)
}
case len(changed.needsRebuild) > 0:
bRes, err := r.Build(ctx, out, r.Tagger, changed.needsRebuild)
Expand Down
2 changes: 1 addition & 1 deletion pkg/skaffold/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func NewTestSyncer() *TestSyncer {
}
}

func (t *TestSyncer) Sync(s *sync.Item) error {
func (t *TestSyncer) Sync(ctx context.Context, s *sync.Item) error {
if t.err != nil {
return t.err
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/skaffold/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package sync

import (
"context"
"fmt"
"path"
"path/filepath"
Expand All @@ -29,7 +30,7 @@ import (
)

type Syncer interface {
Sync(s *Item) error
Sync(context.Context, *Item) error
}

type Item struct {
Expand All @@ -40,7 +41,7 @@ type Item struct {

func NewItem(a *latest.Artifact, e watch.Events, builds []build.Artifact) (*Item, error) {
// If there are no changes, short circuit and don't sync anything
if !e.HasChanged() || a.Sync == nil || len(a.Sync) == 0 {
if !e.HasChanged() || len(a.Sync) == 0 {
return nil, nil
}

Expand All @@ -51,16 +52,17 @@ func NewItem(a *latest.Artifact, e watch.Events, builds []build.Artifact) (*Item

toDelete, err := intersect(a.Workspace, a.Sync, e.Deleted)
if err != nil {
return nil, errors.Wrap(err, "intersecting sync map and added, modified files")
return nil, errors.Wrap(err, "intersecting sync map and deleted files")
}

// Something went wrong, don't sync, rebuild.
if toCopy == nil || toDelete == nil {
return nil, nil
}

tag := latestTag(a.ImageName, builds)
if tag == "" {
return nil, fmt.Errorf("Could not find latest tag for image %s in builds: %s", a.ImageName, builds)
return nil, fmt.Errorf("could not find latest tag for image %s in builds: %v", a.ImageName, builds)
}

return &Item{
Expand Down
1 change: 0 additions & 1 deletion pkg/skaffold/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/build"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/watch"
"github.com/GoogleContainerTools/skaffold/testutil"
)
Expand Down