From 72c1f26ba646be2c5abea5c5e9fe5c150020c706 Mon Sep 17 00:00:00 2001 From: Nick Kubala Date: Wed, 24 Jul 2019 15:24:52 -0700 Subject: [PATCH] monitor kubectl logs when port forwarding and retry on error --- integration/dev_test.go | 78 +++++++++++++++++++ .../kubernetes/portforward/entry_manager.go | 13 +++- .../portforward/kubectl_forwarder.go | 35 +++++++-- .../portforward/port_forward_entry.go | 3 + .../portforward/resource_forwarder_test.go | 2 + 5 files changed, 122 insertions(+), 9 deletions(-) diff --git a/integration/dev_test.go b/integration/dev_test.go index 9d1d40bbf93..fb76f47b913 100644 --- a/integration/dev_test.go +++ b/integration/dev_test.go @@ -18,6 +18,10 @@ package integration import ( "context" + "io/ioutil" + "net/http" + "os" + "strings" "testing" "time" @@ -173,3 +177,77 @@ func TestDevAPITriggers(t *testing.T) { }) testutil.CheckError(t, false, err) } + +func TestDevPortForward(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + if ShouldRunGCPOnlyTests() { + t.Skip("skipping test that is not gcp only") + } + + // Run skaffold build first to fail quickly on a build failure + skaffold.Build().InDir("examples/microservices").RunOrFail(t) + + ns, _, deleteNs := SetupNamespace(t) + defer deleteNs() + + stop := skaffold.Dev("--port-forward").InDir("examples/microservices").InNs(ns.Name).RunBackground(t) + defer stop() + + err := wait.PollImmediate(time.Millisecond*500, 10*time.Minute, func() (bool, error) { + resp, err := http.Get("http://localhost:50053") + if err != nil { + return false, nil + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return false, nil + } + return "leeroooooy app!!\n" == string(body), nil + }) + testutil.CheckError(t, false, err) + + original, perms, fErr := replaceInFile("leeroooooy app!!", "test string", "examples/microservices/leeroy-app/app.go") + if fErr != nil { + t.Error(fErr) + } + defer func() { + if original != nil { + ioutil.WriteFile("examples/microservices/leeroy-app/app.go", original, perms) + } + }() + + err = wait.PollImmediate(time.Millisecond*500, 10*time.Minute, func() (bool, error) { + resp, err := http.Get("http://localhost:50053") + if err != nil { + return false, nil + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return false, nil + } + return "test string\n" == string(body), nil + }) + + testutil.CheckError(t, false, err) +} + +func replaceInFile(target, replacement, filepath string) ([]byte, os.FileMode, error) { + fInfo, err := os.Stat(filepath) + if err != nil { + return nil, 0, err + } + original, err := ioutil.ReadFile(filepath) + if err != nil { + return nil, 0, err + } + + newContents := strings.Replace(string(original), target, replacement, -1) + + err = ioutil.WriteFile(filepath, []byte(newContents), 0) + + return original, fInfo.Mode(), err +} diff --git a/pkg/skaffold/kubernetes/portforward/entry_manager.go b/pkg/skaffold/kubernetes/portforward/entry_manager.go index 3951ff2ce14..6a66131b42a 100644 --- a/pkg/skaffold/kubernetes/portforward/entry_manager.go +++ b/pkg/skaffold/kubernetes/portforward/entry_manager.go @@ -165,17 +165,23 @@ func (b *EntryManager) forwardPortForwardEntry(ctx context.Context, entry *portF return nil } b.forwardedResources.Store(entry.key(), entry) - color.Default.Fprintln(b.output, fmt.Sprintf("Port Forwarding %s/%s %d -> %d", entry.resource.Type, entry.resource.Name, entry.resource.Port, entry.localPort)) err := wait.PollImmediate(time.Second, forwardingTimeoutTime, func() (bool, error) { if err := b.Forward(ctx, entry); err != nil { return false, nil } return true, nil }) + + go b.Monitor(entry, func() { + b.Retry(ctx, entry) + }) + if err != nil { return err } + color.Default.Fprintln(b.output, fmt.Sprintf("Port forwarded %s/%s from remote port %d to local port %d", entry.resource.Type, entry.resource.Name, entry.resource.Port, entry.localPort)) + portForwardEvent(entry) return nil } @@ -193,3 +199,8 @@ func (b *EntryManager) Terminate(p *portForwardEntry) { b.forwardedPorts.Delete(p.localPort) b.EntryForwarder.Terminate(p) } + +func (b *EntryManager) Retry(ctx context.Context, p *portForwardEntry) error { + b.Terminate(p) + return b.forwardPortForwardEntry(ctx, p) +} diff --git a/pkg/skaffold/kubernetes/portforward/kubectl_forwarder.go b/pkg/skaffold/kubernetes/portforward/kubectl_forwarder.go index aa02d3eb0ca..795ddaf67f0 100644 --- a/pkg/skaffold/kubernetes/portforward/kubectl_forwarder.go +++ b/pkg/skaffold/kubernetes/portforward/kubectl_forwarder.go @@ -22,6 +22,7 @@ import ( "fmt" "net" "os/exec" + "strings" "time" "k8s.io/apimachinery/pkg/util/wait" @@ -35,6 +36,7 @@ import ( type EntryForwarder interface { Forward(parentCtx context.Context, pfe *portForwardEntry) error Terminate(p *portForwardEntry) + Monitor(*portForwardEntry, func()) } type KubectlForwarder struct{} @@ -42,8 +44,6 @@ type KubectlForwarder struct{} // Forward port-forwards a pod using kubectl port-forward // It returns an error only if the process fails or was terminated by a signal other than SIGTERM func (*KubectlForwarder) Forward(parentCtx context.Context, pfe *portForwardEntry) error { - logrus.Debugf("Port forwarding %v", pfe) - ctx, cancel := context.WithCancel(parentCtx) // when retrying a portforwarding entry, it might already have a context running if pfe.cancel != nil { @@ -51,23 +51,23 @@ func (*KubectlForwarder) Forward(parentCtx context.Context, pfe *portForwardEntr } pfe.cancel = cancel - cmd := exec.CommandContext(ctx, "kubectl", "port-forward", fmt.Sprintf("%s/%s", pfe.resource.Type, pfe.resource.Name), fmt.Sprintf("%d:%d", pfe.localPort, pfe.resource.Port), "--namespace", pfe.resource.Namespace) - buf := &bytes.Buffer{} - cmd.Stdout = buf - cmd.Stderr = buf + cmd := exec.CommandContext(ctx, "kubectl", "port-forward", "--pod-running-timeout", "5s", fmt.Sprintf("%s/%s", pfe.resource.Type, pfe.resource.Name), fmt.Sprintf("%d:%d", pfe.localPort, pfe.resource.Port), "--namespace", pfe.resource.Namespace) + pfe.logBuffer = &bytes.Buffer{} + cmd.Stdout = pfe.logBuffer + cmd.Stderr = pfe.logBuffer if err := cmd.Start(); err != nil { if errors.Cause(err) == context.Canceled { return nil } - return errors.Wrapf(err, "port forwarding %s/%s, port: %d to local port: %d, err: %s", pfe.resource.Type, pfe.resource.Name, pfe.resource.Port, pfe.localPort, buf.String()) + return errors.Wrapf(err, "port forwarding %s/%s, port: %d to local port: %d, err: %s", pfe.resource.Type, pfe.resource.Name, pfe.resource.Port, pfe.localPort, pfe.logBuffer.String()) } resultChan := make(chan error, 1) go func() { err := cmd.Wait() if err != nil { - logrus.Debugf("port forwarding %v terminated: %s, output: %s", pfe, err, buf.String()) + logrus.Debugf("port forwarding %v terminated: %s, output: %s", pfe, err, pfe.logBuffer.String()) resultChan <- err } }() @@ -96,3 +96,22 @@ func (*KubectlForwarder) Terminate(p *portForwardEntry) { p.cancel() } } + +// Monitor monitors the logs for a kubectl port forward command +// If it sees an error, it calls back to the EntryManager to +// retry the entire port forward operation. +func (*KubectlForwarder) Monitor(p *portForwardEntry, retryFunc func()) { + for { + time.Sleep(1 * time.Second) + s, _ := p.logBuffer.ReadString(byte('\n')) + if s != "" { + logrus.Tracef("[port-forward] %s", s) + if strings.Contains(s, "error forwarding port") { + // kubectl is having an error. retry the command + logrus.Infof("retrying kubectl port-forward due to error: %s", s) + go retryFunc() + return + } + } + } +} diff --git a/pkg/skaffold/kubernetes/portforward/port_forward_entry.go b/pkg/skaffold/kubernetes/portforward/port_forward_entry.go index 2cb0c846f07..38e15481dc3 100644 --- a/pkg/skaffold/kubernetes/portforward/port_forward_entry.go +++ b/pkg/skaffold/kubernetes/portforward/port_forward_entry.go @@ -17,6 +17,7 @@ limitations under the License. package portforward import ( + "bytes" "context" "fmt" @@ -32,6 +33,8 @@ type portForwardEntry struct { localPort int automaticPodForwarding bool + logBuffer *bytes.Buffer + cancel context.CancelFunc } diff --git a/pkg/skaffold/kubernetes/portforward/resource_forwarder_test.go b/pkg/skaffold/kubernetes/portforward/resource_forwarder_test.go index fefde6469bd..fc6242e616d 100644 --- a/pkg/skaffold/kubernetes/portforward/resource_forwarder_test.go +++ b/pkg/skaffold/kubernetes/portforward/resource_forwarder_test.go @@ -45,6 +45,8 @@ func (f *testForwarder) Forward(ctx context.Context, pfe *portForwardEntry) erro return f.forwardErr } +func (f *testForwarder) Monitor(_ *portForwardEntry, _ func()) {} + func (f *testForwarder) Terminate(pfe *portForwardEntry) { f.forwardedResources.Delete(pfe.key()) f.forwardedPorts.Delete(pfe.resource.Port)