diff --git a/pkg/skaffold/kubernetes/sync.go b/pkg/skaffold/kubernetes/sync.go index 511f35563b2..4e442c62921 100644 --- a/pkg/skaffold/kubernetes/sync.go +++ b/pkg/skaffold/kubernetes/sync.go @@ -17,77 +17,82 @@ limitations under the License. package kubernetes import ( + "context" "fmt" "os/exec" - "strings" - "github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/sync" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/util" - "github.com/sirupsen/logrus" - "golang.org/x/sync/errgroup" "github.com/pkg/errors" + "github.com/sirupsen/logrus" "k8s.io/api/core/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type KubectlSyncer struct{} -func (k *KubectlSyncer) Sync(s *sync.Item) error { - if err := perform(s.Image, s.Copy, copyFileFn); err != nil { +func (k *KubectlSyncer) Sync(ctx context.Context, s *sync.Item) error { + logrus.Infoln("Copying files:", s.Copy, "to", s.Image) + + if err := perform(ctx, s.Image, s.Copy, copyFileFn); err != nil { return errors.Wrap(err, "copying files") } - if err := perform(s.Image, s.Delete, deleteFileFn); err != nil { + + logrus.Infoln("Deleting files:", s.Delete, "from", s.Image) + + if err := perform(ctx, s.Image, s.Delete, deleteFileFn); err != nil { return errors.Wrap(err, "deleting files") } + return nil } -func deleteFileFn(pod v1.Pod, container v1.Container, src, dst string) *exec.Cmd { - return exec.Command("kubectl", "exec", pod.Name, "--namespace", pod.Namespace, "-c", container.Name, "--", "rm", "-rf", dst) +func deleteFileFn(ctx context.Context, pod v1.Pod, container v1.Container, src, dst string) *exec.Cmd { + return exec.CommandContext(ctx, "kubectl", "exec", pod.Name, "--namespace", pod.Namespace, "-c", container.Name, "--", "rm", "-rf", dst) } -func copyFileFn(pod v1.Pod, container v1.Container, src, dst string) *exec.Cmd { - return exec.Command("kubectl", "cp", src, fmt.Sprintf("%s/%s:%s", pod.Namespace, pod.Name, dst), "-c", container.Name) +func copyFileFn(ctx context.Context, pod v1.Pod, container v1.Container, src, dst string) *exec.Cmd { + return exec.CommandContext(ctx, "kubectl", "cp", src, fmt.Sprintf("%s/%s:%s", pod.Namespace, pod.Name, dst), "-c", container.Name) } -func labelSelector() string { - var reqs []string - for k, v := range constants.Labels.DefaultLabels { - reqs = append(reqs, fmt.Sprintf("%s=%s", k, v)) +func perform(ctx context.Context, image string, files map[string]string, cmdFn func(context.Context, v1.Pod, v1.Container, string, string) *exec.Cmd) error { + if len(files) == 0 { + return nil } - return strings.Join(reqs, ",") -} -func perform(image string, files map[string]string, cmdFn func(v1.Pod, v1.Container, string, string) *exec.Cmd) error { - logrus.Info("Syncing files:", files) client, err := Client() if err != nil { return errors.Wrap(err, "getting k8s client") } - pods, err := client.CoreV1().Pods("").List(meta_v1.ListOptions{ - LabelSelector: labelSelector(), - }) + + pods, err := client.CoreV1().Pods("").List(meta_v1.ListOptions{}) if err != nil { return errors.Wrap(err, "getting pods") } + + synced := map[string]bool{} + for _, p := range pods.Items { for _, c := range p.Spec.Containers { - if c.Image == image { - var e errgroup.Group - for src, dst := range files { - src, dst := src, dst - e.Go(func() error { - cmd := cmdFn(p, c, src, dst) - return util.RunCmd(cmd) - }) - } - if err := e.Wait(); err != nil { - return errors.Wrap(err, "syncing files") + if c.Image != image { + continue + } + + for src, dst := range files { + cmd := cmdFn(ctx, p, c, src, dst) + if err := util.RunCmd(cmd); err != nil { + return err } + + synced[src] = true } } } + + if len(synced) != len(files) { + return errors.New("couldn't sync all the files") + } + return nil } diff --git a/pkg/skaffold/kubernetes/sync_test.go b/pkg/skaffold/kubernetes/sync_test.go index 2cc70021dc0..36910ee3755 100644 --- a/pkg/skaffold/kubernetes/sync_test.go +++ b/pkg/skaffold/kubernetes/sync_test.go @@ -17,6 +17,7 @@ limitations under the License. package kubernetes import ( + "context" "fmt" "os/exec" "strings" @@ -49,8 +50,8 @@ func (t *TestCmdRecorder) RunCmdOut(cmd *exec.Cmd) ([]byte, error) { return nil, t.RunCmd(cmd) } -func fakeCmd(p v1.Pod, c v1.Container, src, dst string) *exec.Cmd { - return exec.Command("copy", src, dst) +func fakeCmd(ctx context.Context, p v1.Pod, c v1.Container, src, dst string) *exec.Cmd { + return exec.CommandContext(ctx, "copy", src, dst) } var pod = &v1.Pod{ @@ -76,7 +77,7 @@ func TestPerform(t *testing.T) { description string image string files map[string]string - cmdFn func(v1.Pod, v1.Container, string, string) *exec.Cmd + cmdFn func(context.Context, v1.Pod, v1.Container, string, string) *exec.Cmd cmdErr error clientErr error expected []string @@ -110,6 +111,7 @@ func TestPerform(t *testing.T) { image: "gcr.io/different-pod:123", files: map[string]string{"test.go": "/test.go"}, cmdFn: fakeCmd, + shouldErr: true, }, } @@ -125,7 +127,9 @@ func TestPerform(t *testing.T) { } util.DefaultExecCommand = cmdRecord - err := perform(test.image, test.files, test.cmdFn) + + err := perform(context.Background(), test.image, test.files, test.cmdFn) + testutil.CheckErrorAndDeepEqual(t, test.shouldErr, err, test.expected, cmdRecord.cmds) }) } diff --git a/pkg/skaffold/runner/runner.go b/pkg/skaffold/runner/runner.go index 5a41ef1ac01..c7376b3801a 100644 --- a/pkg/skaffold/runner/runner.go +++ b/pkg/skaffold/runner/runner.go @@ -30,6 +30,7 @@ import ( "github.com/GoogleContainerTools/skaffold/pkg/skaffold/build/kaniko" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/build/local" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/build/tag" + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/color" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/config" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/deploy" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/docker" @@ -259,8 +260,7 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la } if s != nil { changed.AddResync(s) - } - if s == nil { + } else { changed.AddRebuild(a.artifact) } } @@ -271,12 +271,12 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la return ErrorConfigurationChanged case len(changed.needsResync) > 0: for _, s := range changed.needsResync { - if err := r.Syncer.Sync(s); err != nil { + color.Default.Fprintf(out, "Syncing %d files for %s\n", len(s.Copy)+len(s.Delete), s.Image) + + if err := r.Syncer.Sync(ctx, s); err != nil { logrus.Warnln("Skipping build and deploy due to sync error:", err) return nil } - logrus.Infof("Synced %d files for %s", len(s.Copy)+len(s.Delete), s.Image) - logrus.Debugf("Synced files for %s...\nCopied: %s\nDeleted: %s\n", s.Image, s.Copy, s.Delete) } case len(changed.needsRebuild) > 0: bRes, err := r.Build(ctx, out, r.Tagger, changed.needsRebuild) diff --git a/pkg/skaffold/runner/runner_test.go b/pkg/skaffold/runner/runner_test.go index bbb09d8445a..8e0ed10afa3 100644 --- a/pkg/skaffold/runner/runner_test.go +++ b/pkg/skaffold/runner/runner_test.go @@ -123,7 +123,7 @@ func NewTestSyncer() *TestSyncer { } } -func (t *TestSyncer) Sync(s *sync.Item) error { +func (t *TestSyncer) Sync(ctx context.Context, s *sync.Item) error { if t.err != nil { return t.err } diff --git a/pkg/skaffold/sync/sync.go b/pkg/skaffold/sync/sync.go index d53ab842b0b..d5849480a79 100644 --- a/pkg/skaffold/sync/sync.go +++ b/pkg/skaffold/sync/sync.go @@ -17,6 +17,7 @@ limitations under the License. package sync import ( + "context" "fmt" "path" "path/filepath" @@ -29,7 +30,7 @@ import ( ) type Syncer interface { - Sync(s *Item) error + Sync(context.Context, *Item) error } type Item struct { @@ -40,7 +41,7 @@ type Item struct { func NewItem(a *latest.Artifact, e watch.Events, builds []build.Artifact) (*Item, error) { // If there are no changes, short circuit and don't sync anything - if !e.HasChanged() || a.Sync == nil || len(a.Sync) == 0 { + if !e.HasChanged() || len(a.Sync) == 0 { return nil, nil } @@ -51,16 +52,17 @@ func NewItem(a *latest.Artifact, e watch.Events, builds []build.Artifact) (*Item toDelete, err := intersect(a.Workspace, a.Sync, e.Deleted) if err != nil { - return nil, errors.Wrap(err, "intersecting sync map and added, modified files") + return nil, errors.Wrap(err, "intersecting sync map and deleted files") } + // Something went wrong, don't sync, rebuild. if toCopy == nil || toDelete == nil { return nil, nil } tag := latestTag(a.ImageName, builds) if tag == "" { - return nil, fmt.Errorf("Could not find latest tag for image %s in builds: %s", a.ImageName, builds) + return nil, fmt.Errorf("could not find latest tag for image %s in builds: %v", a.ImageName, builds) } return &Item{ diff --git a/pkg/skaffold/sync/sync_test.go b/pkg/skaffold/sync/sync_test.go index ea310375177..175c36a5a02 100644 --- a/pkg/skaffold/sync/sync_test.go +++ b/pkg/skaffold/sync/sync_test.go @@ -21,7 +21,6 @@ import ( "github.com/GoogleContainerTools/skaffold/pkg/skaffold/build" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest" - "github.com/GoogleContainerTools/skaffold/pkg/skaffold/watch" "github.com/GoogleContainerTools/skaffold/testutil" )