Skip to content

Commit

Permalink
Move WaitForDeploymentsToStabilize to webhook.
Browse files Browse the repository at this point in the history
  • Loading branch information
tejal29 committed Jun 27, 2019
1 parent fd3d9ab commit 91dca83
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 99 deletions.
2 changes: 1 addition & 1 deletion cmd/skaffold/app/cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ var FlagRegistry = []Flag{
Value: &opts.StatusCheck,
DefValue: true,
FlagAddMethod: "BoolVar",
DefinedOn: []string{"dev", "debug", "deploy"},
DefinedOn: []string{"dev", "debug", "deploy", "run"},
},
}

Expand Down
2 changes: 1 addition & 1 deletion integration/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestDebug(t *testing.T) {
defer stop()

client.WaitForPodsReady(test.pods...)
client.WaitForDeploymentsToStabilize(test.deployments...)

for _, depName := range test.deployments {
deploy := client.GetDeployment(depName)
annotations := deploy.Spec.Template.GetAnnotations()
Expand Down
2 changes: 0 additions & 2 deletions integration/helm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ func TestHelmDeploy(t *testing.T) {

skaffold.Deploy(runArgs...).InDir(helmDir).InNs(ns.Name).WithEnv(env).RunOrFailOutput(t)

client.WaitForDeploymentsToStabilize(depName)

expectedLabels := map[string]string{
"app.kubernetes.io/managed-by": TestVersion,
"release": depName,
Expand Down
1 change: 0 additions & 1 deletion integration/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func TestRun(t *testing.T) {
skaffold.Run(test.args...).WithConfig(test.filename).InDir(test.dir).InNs(ns.Name).WithEnv(test.env).RunOrFailOutput(t)

client.WaitForPodsReady(test.pods...)
client.WaitForDeploymentsToStabilize(test.deployments...)

skaffold.Delete().WithConfig(test.filename).InDir(test.dir).InNs(ns.Name).WithEnv(test.env).RunOrFail(t)
})
Expand Down
51 changes: 0 additions & 51 deletions integration/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,60 +126,13 @@ func (k *NSKubernetesClient) WaitForPodsReady(podNames ...string) {

// GetDeployment gets a deployment by name.
func (k *NSKubernetesClient) GetDeployment(depName string) *appsv1.Deployment {
k.WaitForDeploymentsToStabilize(depName)

dep, err := k.client.AppsV1().Deployments(k.ns).Get(depName, meta_v1.GetOptions{})
if err != nil {
k.t.Fatalf("Could not find deployment: %s in namespace %s", depName, k.ns)
}
return dep
}

// WaitForDeploymentsToStabilize waits for a list of deployments to become stable.
func (k *NSKubernetesClient) WaitForDeploymentsToStabilize(depNames ...string) {
if len(depNames) == 0 {
return
}

logrus.Infoln("Waiting for deployments", depNames, "to stabilize")

ctx, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancelTimeout()

w, err := k.client.AppsV1().Deployments(k.ns).Watch(meta_v1.ListOptions{})
if err != nil {
k.t.Fatalf("Unable to watch deployments: %v", err)
}
defer w.Stop()

deployments := map[string]*appsv1.Deployment{}

for {
waitLoop:
select {
case <-ctx.Done():
k.debug("deployments.apps")
k.debug("pods")
k.t.Fatalf("Timed out waiting for deployments %v to stabilize in namespace %s", depNames, k.ns)

case event := <-w.ResultChan():
dp := event.Object.(*appsv1.Deployment)
logrus.Infof("Deployment %s: Generation %d/%d, Replicas %d/%d", dp.Name, dp.Status.ObservedGeneration, dp.Generation, dp.Status.Replicas, *(dp.Spec.Replicas))

deployments[dp.Name] = dp

for _, depName := range depNames {
if d, present := deployments[depName]; !present || !isStable(d) {
break waitLoop
}
}

logrus.Infoln("Deployments", depNames, "are stable")
return
}
}
}

// debug is used to print all the details about pods or deployments
func (k *NSKubernetesClient) debug(entities string) {
cmd := exec.Command("kubectl", "-n", k.ns, "get", entities, "-oyaml")
Expand All @@ -189,7 +142,3 @@ func (k *NSKubernetesClient) debug(entities string) {
// Use fmt.Println, not logrus, for prettier output
fmt.Println(string(out))
}

func isStable(dp *appsv1.Deployment) bool {
return dp.Generation <= dp.Status.ObservedGeneration && *(dp.Spec.Replicas) == dp.Status.Replicas
}
39 changes: 0 additions & 39 deletions pkg/skaffold/kubernetes/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,10 @@ import (
"fmt"
"time"

"github.com/golang/glog"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
)

Expand Down Expand Up @@ -121,36 +115,3 @@ func WaitForPodInitialized(ctx context.Context, pods corev1.PodInterface, podNam
return false, nil
})
}

// WaitForDeploymentToStabilize waits until the Deployment has a matching generation/replica count between spec and status.
func WaitForDeploymentToStabilize(ctx context.Context, c kubernetes.Interface, ns, name string, timeout time.Duration) error {
logrus.Infof("Waiting for %s to stabilize", name)

fields := fields.Set{
"metadata.name": name,
"metadata.namespace": ns,
}
w, err := c.AppsV1().Deployments(ns).Watch(meta_v1.ListOptions{
FieldSelector: fields.AsSelector().String(),
})
if err != nil {
return fmt.Errorf("initializing deployment watcher: %s", err)
}

return watchUntilTimeout(ctx, timeout, w, func(event *watch.Event) (bool, error) {
if event.Type == watch.Deleted {
return false, apierrs.NewNotFound(schema.GroupResource{Resource: "deployments"}, "")
}

if dp, ok := event.Object.(*appsv1.Deployment); ok {
if dp.Name == name && dp.Namespace == ns &&
dp.Generation <= dp.Status.ObservedGeneration &&
*(dp.Spec.Replicas) == dp.Status.Replicas {
return true, nil
}
glog.Infof("Waiting for deployment %s to stabilize, generation %v observed generation %v spec.replicas %d status.replicas %d",
name, dp.Generation, dp.Status.ObservedGeneration, *(dp.Spec.Replicas), dp.Status.Replicas)
}
return false, nil
})
}
2 changes: 1 addition & 1 deletion pkg/skaffold/runner/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,4 @@ func TestDeploy(t *testing.T) {
>>>>>>> add status check flag which does nothing for now and test
})
}
}
}
2 changes: 2 additions & 0 deletions pkg/skaffold/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ func (r *SkaffoldRunner) performStatusCheck(out io.Writer) error {
>>>>>>> fix gofmt and integration tests
if err != nil {
fmt.Fprintln(out, err.Error())
} else {
fmt.Fprintln(out, "Deployments stabilized.")
}
return err
>>>>>>> more testing
Expand Down
48 changes: 47 additions & 1 deletion pkg/webhook/kubernetes/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,19 @@ import (
pkgkubernetes "github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes"
"github.com/GoogleContainerTools/skaffold/pkg/webhook/constants"
"github.com/GoogleContainerTools/skaffold/pkg/webhook/labels"
"github.com/golang/glog"
"github.com/google/go-github/github"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)

const (
Expand Down Expand Up @@ -128,7 +135,7 @@ func WaitForDeploymentToStabilize(d *appsv1.Deployment, ip string) error {
if err != nil {
return errors.Wrap(err, "getting clientset")
}
if err := pkgkubernetes.WaitForDeploymentToStabilize(context.Background(), client, d.Namespace, d.Name, 5*time.Minute); err != nil {
if err := waitForDeploymentToStabilize(context.Background(), client, d.Namespace, d.Name, 5*time.Minute); err != nil {
return errors.Wrap(err, "waiting for deployment to stabilize")
}
// wait up to five minutes for the URL to return a valid endpoint
Expand Down Expand Up @@ -166,3 +173,42 @@ func Logs(d *appsv1.Deployment) string {
}
return fmt.Sprintf("Init container logs: \n %s \nContainer Logs: \n %s", initLogs, logs)
}

// waitForDeploymentToStabilize waits until the Deployment has a matching generation/replica count between spec and status.
func waitForDeploymentToStabilize(ctx context.Context, c kubernetes.Interface, ns, name string, timeout time.Duration) error {
logrus.Infof("Waiting for %s to stabilize", name)

fields := fields.Set{
"metadata.name": name,
"metadata.namespace": ns,
}
w, err := c.AppsV1().Deployments(ns).Watch(metav1.ListOptions{
FieldSelector: fields.AsSelector().String(),
})
if err != nil {
return fmt.Errorf("initializing deployment watcher: %s", err)
}

ctx, cancelTimeout := context.WithTimeout(ctx, timeout)
defer cancelTimeout()

for {
select {
case <-ctx.Done():
return errors.New("context closed while waiting for condition")
case event := <-w.ResultChan():
if event.Type == watch.Deleted {
return apierrs.NewNotFound(schema.GroupResource{Resource: "deployments"}, "")
}
if dp, ok := event.Object.(*appsv1.Deployment); ok {
if dp.Name == name && dp.Namespace == ns &&
dp.Generation <= dp.Status.ObservedGeneration &&
*(dp.Spec.Replicas) == dp.Status.Replicas {
return nil
}
glog.Infof("Waiting for deployment %s to stabilize, generation %v observed generation %v spec.replicas %d status.replicas %d",
name, dp.Generation, dp.Status.ObservedGeneration, *(dp.Spec.Replicas), dp.Status.Replicas)
}
}
}
}
3 changes: 1 addition & 2 deletions webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import (
"log"
"net/http"

"github.com/GoogleContainerTools/skaffold/pkg/webhook/gcs"

"github.com/GoogleContainerTools/skaffold/pkg/webhook/constants"
"github.com/GoogleContainerTools/skaffold/pkg/webhook/gcs"
pkggithub "github.com/GoogleContainerTools/skaffold/pkg/webhook/github"
"github.com/GoogleContainerTools/skaffold/pkg/webhook/kubernetes"
"github.com/GoogleContainerTools/skaffold/pkg/webhook/labels"
Expand Down

0 comments on commit 91dca83

Please sign in to comment.