Skip to content

Commit

Permalink
Move WaitForDeploymentsToStabilize to webhook.
Browse files Browse the repository at this point in the history
  • Loading branch information
tejal29 committed Jul 10, 2019
1 parent c69452c commit bc44cc7
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 269 deletions.
14 changes: 1 addition & 13 deletions cmd/skaffold/app/cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,23 +222,11 @@ var FlagRegistry = []Flag{
},
{
Name: "status-check",
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
Usage: "Wait for deployed resources to stabilize",
=======
Usage: "",
>>>>>>> wip
=======
Usage: "Wait for deployed resources to stabalize",
>>>>>>> fix linter
=======
Usage: "Wait for deployed resources to stabilize",
>>>>>>> address @balintp's comment
Value: &opts.StatusCheck,
DefValue: true,
FlagAddMethod: "BoolVar",
DefinedOn: []string{"dev", "debug", "deploy"},
DefinedOn: []string{"dev", "debug", "deploy", "run"},
},
}

Expand Down
21 changes: 0 additions & 21 deletions docs/content/en/docs/references/cli/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,6 @@ Flags:
--rpc-http-port int tcp port to expose event REST API over HTTP (default 50052)
--rpc-port int tcp port to expose event API (default 50051)
--skip-tests Whether to skip the tests after building
<<<<<<< HEAD
<<<<<<< HEAD
=======
--status-check Wait for deployed resources to stabalize (default true)
>>>>>>> fix linter
=======
>>>>>>> address @balintp's comment
--tail Stream logs from deployed objects (default true)
--toot Emit a terminal beep after the deploy is complete
Expand Down Expand Up @@ -410,13 +403,6 @@ Flags:
-p, --profile strings Activate profiles by name
--rpc-http-port int tcp port to expose event REST API over HTTP (default 50052)
--rpc-port int tcp port to expose event API (default 50051)
<<<<<<< HEAD
<<<<<<< HEAD
=======
--status-check Wait for deployed resources to stabalize (default true)
>>>>>>> fix linter
=======
>>>>>>> address @balintp's comment
--tail Stream logs from deployed objects (default false)
--toot Emit a terminal beep after the deploy is complete
Expand Down Expand Up @@ -500,13 +486,6 @@ Flags:
--rpc-http-port int tcp port to expose event REST API over HTTP (default 50052)
--rpc-port int tcp port to expose event API (default 50051)
--skip-tests Whether to skip the tests after building
<<<<<<< HEAD
<<<<<<< HEAD
=======
--status-check Wait for deployed resources to stabalize (default true)
>>>>>>> fix linter
=======
>>>>>>> address @balintp's comment
--tail Stream logs from deployed objects (default true)
--toot Emit a terminal beep after the deploy is complete
--trigger string How are changes detected? (polling, manual or notify) (default "polling")
Expand Down
1 change: 1 addition & 0 deletions integration/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestDebug(t *testing.T) {
defer stop()

client.WaitForPodsReady(test.pods...)

for _, depName := range test.deployments {
deploy := client.GetDeployment(depName)

Expand Down
2 changes: 0 additions & 2 deletions integration/helm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ func TestHelmDeploy(t *testing.T) {

skaffold.Deploy(runArgs...).InDir(helmDir).InNs(ns.Name).WithEnv(env).RunOrFailOutput(t)

client.WaitForDeploymentsToStabilize(depName)

expectedLabels := map[string]string{
"app.kubernetes.io/managed-by": TestVersion,
"release": depName,
Expand Down
1 change: 0 additions & 1 deletion integration/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func TestRun(t *testing.T) {
skaffold.Run(test.args...).WithConfig(test.filename).InDir(test.dir).InNs(ns.Name).WithEnv(test.env).RunOrFailOutput(t)

client.WaitForPodsReady(test.pods...)
client.WaitForDeploymentsToStabilize(test.deployments...)

skaffold.Delete().WithConfig(test.filename).InDir(test.dir).InNs(ns.Name).WithEnv(test.env).RunOrFail(t)
})
Expand Down
52 changes: 0 additions & 52 deletions integration/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,62 +128,13 @@ func (k *NSKubernetesClient) WaitForPodsReady(podNames ...string) {

// GetDeployment gets a deployment by name.
func (k *NSKubernetesClient) GetDeployment(depName string) *appsv1.Deployment {
k.WaitForDeploymentsToStabilize(depName)

dep, err := k.client.AppsV1().Deployments(k.ns).Get(depName, meta_v1.GetOptions{})
if err != nil {
k.t.Fatalf("Could not find deployment: %s in namespace %s", depName, k.ns)
}
return dep
}

// WaitForDeploymentsToStabilize waits for a list of deployments to become stable.
func (k *NSKubernetesClient) WaitForDeploymentsToStabilize(depNames ...string) {
if len(depNames) == 0 {
return
}

logrus.Infoln("Waiting for deployments", depNames, "to stabilize")

ctx, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancelTimeout()

w, err := k.client.AppsV1().Deployments(k.ns).Watch(meta_v1.ListOptions{})
if err != nil {
k.t.Fatalf("Unable to watch deployments: %v", err)
}
defer w.Stop()

deployments := map[string]*appsv1.Deployment{}

for {
waitLoop:
select {
case <-ctx.Done():
k.printDiskFreeSpace()
k.debug("nodes")
k.debug("deployments.apps")
k.debug("pods")
k.t.Fatalf("Timed out waiting for deployments %v to stabilize in namespace %s", depNames, k.ns)

case event := <-w.ResultChan():
dp := event.Object.(*appsv1.Deployment)
logrus.Infof("Deployment %s: Generation %d/%d, Replicas %d/%d", dp.Name, dp.Status.ObservedGeneration, dp.Generation, dp.Status.Replicas, *(dp.Spec.Replicas))

deployments[dp.Name] = dp

for _, depName := range depNames {
if d, present := deployments[depName]; !present || !isStable(d) {
break waitLoop
}
}

logrus.Infoln("Deployments", depNames, "are stable")
return
}
}
}

// debug is used to print all the details about pods or deployments
func (k *NSKubernetesClient) debug(entities string) {
cmd := exec.Command("kubectl", "-n", k.ns, "get", entities, "-oyaml")
Expand All @@ -200,6 +151,3 @@ func (k *NSKubernetesClient) printDiskFreeSpace() {
fmt.Println(string(out))
}

func isStable(dp *appsv1.Deployment) bool {
return dp.Generation <= dp.Status.ObservedGeneration && *(dp.Spec.Replicas) == dp.Status.Replicas
}
39 changes: 0 additions & 39 deletions pkg/skaffold/kubernetes/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,10 @@ import (
"fmt"
"time"

"github.com/golang/glog"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
)

Expand Down Expand Up @@ -121,36 +115,3 @@ func WaitForPodInitialized(ctx context.Context, pods corev1.PodInterface, podNam
return false, nil
})
}

// WaitForDeploymentToStabilize waits until the Deployment has a matching generation/replica count between spec and status.
func WaitForDeploymentToStabilize(ctx context.Context, c kubernetes.Interface, ns, name string, timeout time.Duration) error {
logrus.Infof("Waiting for %s to stabilize", name)

fields := fields.Set{
"metadata.name": name,
"metadata.namespace": ns,
}
w, err := c.AppsV1().Deployments(ns).Watch(meta_v1.ListOptions{
FieldSelector: fields.AsSelector().String(),
})
if err != nil {
return fmt.Errorf("initializing deployment watcher: %s", err)
}

return watchUntilTimeout(ctx, timeout, w, func(event *watch.Event) (bool, error) {
if event.Type == watch.Deleted {
return false, apierrs.NewNotFound(schema.GroupResource{Resource: "deployments"}, "")
}

if dp, ok := event.Object.(*appsv1.Deployment); ok {
if dp.Name == name && dp.Namespace == ns &&
dp.Generation <= dp.Status.ObservedGeneration &&
*(dp.Spec.Replicas) == dp.Status.Replicas {
return true, nil
}
glog.Infof("Waiting for deployment %s to stabilize, generation %v observed generation %v spec.replicas %d status.replicas %d",
name, dp.Generation, dp.Status.ObservedGeneration, *(dp.Spec.Replicas), dp.Status.Replicas)
}
return false, nil
})
}
91 changes: 2 additions & 89 deletions pkg/skaffold/runner/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,14 @@ limitations under the License.
package runner

import (
<<<<<<< HEAD
<<<<<<< HEAD
"bytes"
"context"
"errors"
"strings"
=======
"context"
"io/ioutil"
>>>>>>> wip
=======
"bytes"
"context"
"errors"
"strings"
>>>>>>> add status check flag which does nothing for now and test
"testing"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/build"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/deploy"
runcontext "github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/context"
"github.com/GoogleContainerTools/skaffold/testutil"
<<<<<<< HEAD
<<<<<<< HEAD
)

func TestDeploy(t *testing.T) {
Expand All @@ -51,69 +35,26 @@ func TestDeploy(t *testing.T) {
statusCheck bool
shouldErr bool
shouldWait bool
=======
"github.com/pkg/errors"
=======
>>>>>>> add status check flag which does nothing for now and test
)

func TestDeploy(t *testing.T) {
expectedOutput := "Waiting for deployments to stabilize"
var tests = []struct {
description string
testBench *TestBench
statusCheck bool
<<<<<<< HEAD
>>>>>>> wip
=======
shouldErr bool
shouldWait bool
>>>>>>> add status check flag which does nothing for now and test
}{
{
description: "deploy shd perform status check",
testBench: &TestBench{},
statusCheck: true,
<<<<<<< HEAD
<<<<<<< HEAD
shouldWait: true,
=======
>>>>>>> wip
=======
shouldWait: true,
>>>>>>> add status check flag which does nothing for now and test
},
{
description: "deploy shd not perform status check",
testBench: &TestBench{},
},
{
description: "deploy shd not perform status check when deployer is in error",
<<<<<<< HEAD
<<<<<<< HEAD
shouldErr: true,
statusCheck: true,
testBench: &TestBench{deployErrors: []error{errors.New("deploy error")}},
=======
testBench: &TestBench{deployErrors: []error{errors.New("deploy error")}},
shouldError: true,
statusCheck: true,
>>>>>>> wip
=======
shouldErr: true,
statusCheck: true,
testBench: &TestBench{deployErrors: []error{errors.New("deploy error")}},
>>>>>>> add status check flag which does nothing for now and test
},
}

dummyStatusCheck := func(ctx context.Context, l *deploy.DefaultLabeller, runCtx *runcontext.RunContext) error {
return nil
}
originalStatusCheck := deploy.StatusCheck

for _, test := range tests {
<<<<<<< HEAD
<<<<<<< HEAD
testutil.Run(t, test.description, func(t *testutil.T) {

runner := createRunner(t, test.testBench, nil)
Expand All @@ -128,34 +69,6 @@ func TestDeploy(t *testing.T) {
if strings.Contains(out.String(), expectedOutput) != test.shouldWait {
t.Errorf("expected %s to contain %s %t. But found %t", out.String(), expectedOutput, test.shouldWait, !test.shouldWait)
}
=======
t.Run(test.description, func(t *testing.T) {
=======
testutil.Run(t, test.description, func(t *testutil.T) {
>>>>>>> add status check flag which does nothing for now and test

runner := createRunner(t, test.testBench)
runner.runCtx.Opts.StatusCheck = test.statusCheck
out := new(bytes.Buffer)

// Figure out why i can't use t.Override.
// Using t.Override throws an error "reflect: call of reflect.Value.Elem on func Value"
statusCheck = dummyStatusCheck
defer func() { statusCheck = originalStatusCheck }()

err := runner.Deploy(context.Background(), out, []build.Artifact{
{ImageName: "img1", Tag: "img1:tag1"},
{ImageName: "img2", Tag: "img2:tag2"},
})
<<<<<<< HEAD
testutil.CheckError(t, test.shouldError, err)
>>>>>>> wip
=======
t.CheckError(test.shouldErr, err)
if strings.Contains(out.String(), expectedOutput) != test.shouldWait {
t.Errorf("expected %s to contain %s %t. But found %t", out.String(), expectedOutput, test.shouldWait, !test.shouldWait)
}
>>>>>>> add status check flag which does nothing for now and test
})
}
}
}
Loading

0 comments on commit bc44cc7

Please sign in to comment.