Skip to content

Commit

Permalink
Merge pull request #1102 from dgageot/improve-sync
Browse files Browse the repository at this point in the history
Improve sync
  • Loading branch information
dgageot authored Oct 6, 2018
2 parents ee4f75b + 3e8f198 commit 227ed25
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 48 deletions.
71 changes: 38 additions & 33 deletions pkg/skaffold/kubernetes/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,77 +17,82 @@ limitations under the License.
package kubernetes

import (
"context"
"fmt"
"os/exec"
"strings"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/sync"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type KubectlSyncer struct{}

func (k *KubectlSyncer) Sync(s *sync.Item) error {
if err := perform(s.Image, s.Copy, copyFileFn); err != nil {
func (k *KubectlSyncer) Sync(ctx context.Context, s *sync.Item) error {
logrus.Infoln("Copying files:", s.Copy, "to", s.Image)

if err := perform(ctx, s.Image, s.Copy, copyFileFn); err != nil {
return errors.Wrap(err, "copying files")
}
if err := perform(s.Image, s.Delete, deleteFileFn); err != nil {

logrus.Infoln("Deleting files:", s.Delete, "from", s.Image)

if err := perform(ctx, s.Image, s.Delete, deleteFileFn); err != nil {
return errors.Wrap(err, "deleting files")
}

return nil
}

func deleteFileFn(pod v1.Pod, container v1.Container, src, dst string) *exec.Cmd {
return exec.Command("kubectl", "exec", pod.Name, "--namespace", pod.Namespace, "-c", container.Name, "--", "rm", "-rf", dst)
func deleteFileFn(ctx context.Context, pod v1.Pod, container v1.Container, src, dst string) *exec.Cmd {
return exec.CommandContext(ctx, "kubectl", "exec", pod.Name, "--namespace", pod.Namespace, "-c", container.Name, "--", "rm", "-rf", dst)
}

func copyFileFn(pod v1.Pod, container v1.Container, src, dst string) *exec.Cmd {
return exec.Command("kubectl", "cp", src, fmt.Sprintf("%s/%s:%s", pod.Namespace, pod.Name, dst), "-c", container.Name)
func copyFileFn(ctx context.Context, pod v1.Pod, container v1.Container, src, dst string) *exec.Cmd {
return exec.CommandContext(ctx, "kubectl", "cp", src, fmt.Sprintf("%s/%s:%s", pod.Namespace, pod.Name, dst), "-c", container.Name)
}

func labelSelector() string {
var reqs []string
for k, v := range constants.Labels.DefaultLabels {
reqs = append(reqs, fmt.Sprintf("%s=%s", k, v))
func perform(ctx context.Context, image string, files map[string]string, cmdFn func(context.Context, v1.Pod, v1.Container, string, string) *exec.Cmd) error {
if len(files) == 0 {
return nil
}
return strings.Join(reqs, ",")
}

func perform(image string, files map[string]string, cmdFn func(v1.Pod, v1.Container, string, string) *exec.Cmd) error {
logrus.Info("Syncing files:", files)
client, err := Client()
if err != nil {
return errors.Wrap(err, "getting k8s client")
}
pods, err := client.CoreV1().Pods("").List(meta_v1.ListOptions{
LabelSelector: labelSelector(),
})

pods, err := client.CoreV1().Pods("").List(meta_v1.ListOptions{})
if err != nil {
return errors.Wrap(err, "getting pods")
}

synced := map[string]bool{}

for _, p := range pods.Items {
for _, c := range p.Spec.Containers {
if c.Image == image {
var e errgroup.Group
for src, dst := range files {
src, dst := src, dst
e.Go(func() error {
cmd := cmdFn(p, c, src, dst)
return util.RunCmd(cmd)
})
}
if err := e.Wait(); err != nil {
return errors.Wrap(err, "syncing files")
if c.Image != image {
continue
}

for src, dst := range files {
cmd := cmdFn(ctx, p, c, src, dst)
if err := util.RunCmd(cmd); err != nil {
return err
}

synced[src] = true
}
}
}

if len(synced) != len(files) {
return errors.New("couldn't sync all the files")
}

return nil
}
12 changes: 8 additions & 4 deletions pkg/skaffold/kubernetes/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package kubernetes

import (
"context"
"fmt"
"os/exec"
"strings"
Expand Down Expand Up @@ -49,8 +50,8 @@ func (t *TestCmdRecorder) RunCmdOut(cmd *exec.Cmd) ([]byte, error) {
return nil, t.RunCmd(cmd)
}

func fakeCmd(p v1.Pod, c v1.Container, src, dst string) *exec.Cmd {
return exec.Command("copy", src, dst)
func fakeCmd(ctx context.Context, p v1.Pod, c v1.Container, src, dst string) *exec.Cmd {
return exec.CommandContext(ctx, "copy", src, dst)
}

var pod = &v1.Pod{
Expand All @@ -76,7 +77,7 @@ func TestPerform(t *testing.T) {
description string
image string
files map[string]string
cmdFn func(v1.Pod, v1.Container, string, string) *exec.Cmd
cmdFn func(context.Context, v1.Pod, v1.Container, string, string) *exec.Cmd
cmdErr error
clientErr error
expected []string
Expand Down Expand Up @@ -110,6 +111,7 @@ func TestPerform(t *testing.T) {
image: "gcr.io/different-pod:123",
files: map[string]string{"test.go": "/test.go"},
cmdFn: fakeCmd,
shouldErr: true,
},
}

Expand All @@ -125,7 +127,9 @@ func TestPerform(t *testing.T) {
}

util.DefaultExecCommand = cmdRecord
err := perform(test.image, test.files, test.cmdFn)

err := perform(context.Background(), test.image, test.files, test.cmdFn)

testutil.CheckErrorAndDeepEqual(t, test.shouldErr, err, test.expected, cmdRecord.cmds)
})
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/skaffold/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/build/kaniko"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/build/local"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/build/tag"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/color"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/config"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/deploy"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/docker"
Expand Down Expand Up @@ -259,8 +260,7 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la
}
if s != nil {
changed.AddResync(s)
}
if s == nil {
} else {
changed.AddRebuild(a.artifact)
}
}
Expand All @@ -271,12 +271,12 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la
return ErrorConfigurationChanged
case len(changed.needsResync) > 0:
for _, s := range changed.needsResync {
if err := r.Syncer.Sync(s); err != nil {
color.Default.Fprintf(out, "Syncing %d files for %s\n", len(s.Copy)+len(s.Delete), s.Image)

if err := r.Syncer.Sync(ctx, s); err != nil {
logrus.Warnln("Skipping build and deploy due to sync error:", err)
return nil
}
logrus.Infof("Synced %d files for %s", len(s.Copy)+len(s.Delete), s.Image)
logrus.Debugf("Synced files for %s...\nCopied: %s\nDeleted: %s\n", s.Image, s.Copy, s.Delete)
}
case len(changed.needsRebuild) > 0:
bRes, err := r.Build(ctx, out, r.Tagger, changed.needsRebuild)
Expand Down
2 changes: 1 addition & 1 deletion pkg/skaffold/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func NewTestSyncer() *TestSyncer {
}
}

func (t *TestSyncer) Sync(s *sync.Item) error {
func (t *TestSyncer) Sync(ctx context.Context, s *sync.Item) error {
if t.err != nil {
return t.err
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/skaffold/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package sync

import (
"context"
"fmt"
"path"
"path/filepath"
Expand All @@ -29,7 +30,7 @@ import (
)

type Syncer interface {
Sync(s *Item) error
Sync(context.Context, *Item) error
}

type Item struct {
Expand All @@ -40,7 +41,7 @@ type Item struct {

func NewItem(a *latest.Artifact, e watch.Events, builds []build.Artifact) (*Item, error) {
// If there are no changes, short circuit and don't sync anything
if !e.HasChanged() || a.Sync == nil || len(a.Sync) == 0 {
if !e.HasChanged() || len(a.Sync) == 0 {
return nil, nil
}

Expand All @@ -51,16 +52,17 @@ func NewItem(a *latest.Artifact, e watch.Events, builds []build.Artifact) (*Item

toDelete, err := intersect(a.Workspace, a.Sync, e.Deleted)
if err != nil {
return nil, errors.Wrap(err, "intersecting sync map and added, modified files")
return nil, errors.Wrap(err, "intersecting sync map and deleted files")
}

// Something went wrong, don't sync, rebuild.
if toCopy == nil || toDelete == nil {
return nil, nil
}

tag := latestTag(a.ImageName, builds)
if tag == "" {
return nil, fmt.Errorf("Could not find latest tag for image %s in builds: %s", a.ImageName, builds)
return nil, fmt.Errorf("could not find latest tag for image %s in builds: %v", a.ImageName, builds)
}

return &Item{
Expand Down
1 change: 0 additions & 1 deletion pkg/skaffold/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/build"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/watch"
"github.com/GoogleContainerTools/skaffold/testutil"
)
Expand Down

0 comments on commit 227ed25

Please sign in to comment.