Skip to content

Commit

Permalink
monitor kubectl logs when port forwarding and retry on error
Browse files Browse the repository at this point in the history
  • Loading branch information
nkubala committed Jul 25, 2019
1 parent 3df4863 commit 72c1f26
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 9 deletions.
78 changes: 78 additions & 0 deletions integration/dev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ package integration

import (
"context"
"io/ioutil"
"net/http"
"os"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -173,3 +177,77 @@ func TestDevAPITriggers(t *testing.T) {
})
testutil.CheckError(t, false, err)
}

func TestDevPortForward(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
if ShouldRunGCPOnlyTests() {
t.Skip("skipping test that is not gcp only")
}

// Run skaffold build first to fail quickly on a build failure
skaffold.Build().InDir("examples/microservices").RunOrFail(t)

ns, _, deleteNs := SetupNamespace(t)
defer deleteNs()

stop := skaffold.Dev("--port-forward").InDir("examples/microservices").InNs(ns.Name).RunBackground(t)
defer stop()

err := wait.PollImmediate(time.Millisecond*500, 10*time.Minute, func() (bool, error) {
resp, err := http.Get("http://localhost:50053")
if err != nil {
return false, nil
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return false, nil
}
return "leeroooooy app!!\n" == string(body), nil
})
testutil.CheckError(t, false, err)

original, perms, fErr := replaceInFile("leeroooooy app!!", "test string", "examples/microservices/leeroy-app/app.go")
if fErr != nil {
t.Error(fErr)
}
defer func() {
if original != nil {
ioutil.WriteFile("examples/microservices/leeroy-app/app.go", original, perms)
}
}()

err = wait.PollImmediate(time.Millisecond*500, 10*time.Minute, func() (bool, error) {
resp, err := http.Get("http://localhost:50053")
if err != nil {
return false, nil
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return false, nil
}
return "test string\n" == string(body), nil
})

testutil.CheckError(t, false, err)
}

func replaceInFile(target, replacement, filepath string) ([]byte, os.FileMode, error) {
fInfo, err := os.Stat(filepath)
if err != nil {
return nil, 0, err
}
original, err := ioutil.ReadFile(filepath)
if err != nil {
return nil, 0, err
}

newContents := strings.Replace(string(original), target, replacement, -1)

err = ioutil.WriteFile(filepath, []byte(newContents), 0)

return original, fInfo.Mode(), err
}
13 changes: 12 additions & 1 deletion pkg/skaffold/kubernetes/portforward/entry_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,17 +165,23 @@ func (b *EntryManager) forwardPortForwardEntry(ctx context.Context, entry *portF
return nil
}
b.forwardedResources.Store(entry.key(), entry)
color.Default.Fprintln(b.output, fmt.Sprintf("Port Forwarding %s/%s %d -> %d", entry.resource.Type, entry.resource.Name, entry.resource.Port, entry.localPort))
err := wait.PollImmediate(time.Second, forwardingTimeoutTime, func() (bool, error) {
if err := b.Forward(ctx, entry); err != nil {
return false, nil
}
return true, nil
})

go b.Monitor(entry, func() {
b.Retry(ctx, entry)
})

if err != nil {
return err
}

color.Default.Fprintln(b.output, fmt.Sprintf("Port forwarded %s/%s from remote port %d to local port %d", entry.resource.Type, entry.resource.Name, entry.resource.Port, entry.localPort))

portForwardEvent(entry)
return nil
}
Expand All @@ -193,3 +199,8 @@ func (b *EntryManager) Terminate(p *portForwardEntry) {
b.forwardedPorts.Delete(p.localPort)
b.EntryForwarder.Terminate(p)
}

func (b *EntryManager) Retry(ctx context.Context, p *portForwardEntry) error {
b.Terminate(p)
return b.forwardPortForwardEntry(ctx, p)
}
35 changes: 27 additions & 8 deletions pkg/skaffold/kubernetes/portforward/kubectl_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"net"
"os/exec"
"strings"
"time"

"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -35,39 +36,38 @@ import (
type EntryForwarder interface {
Forward(parentCtx context.Context, pfe *portForwardEntry) error
Terminate(p *portForwardEntry)
Monitor(*portForwardEntry, func())
}

type KubectlForwarder struct{}

// Forward port-forwards a pod using kubectl port-forward
// It returns an error only if the process fails or was terminated by a signal other than SIGTERM
func (*KubectlForwarder) Forward(parentCtx context.Context, pfe *portForwardEntry) error {
logrus.Debugf("Port forwarding %v", pfe)

ctx, cancel := context.WithCancel(parentCtx)
// when retrying a portforwarding entry, it might already have a context running
if pfe.cancel != nil {
pfe.cancel()
}
pfe.cancel = cancel

cmd := exec.CommandContext(ctx, "kubectl", "port-forward", fmt.Sprintf("%s/%s", pfe.resource.Type, pfe.resource.Name), fmt.Sprintf("%d:%d", pfe.localPort, pfe.resource.Port), "--namespace", pfe.resource.Namespace)
buf := &bytes.Buffer{}
cmd.Stdout = buf
cmd.Stderr = buf
cmd := exec.CommandContext(ctx, "kubectl", "port-forward", "--pod-running-timeout", "5s", fmt.Sprintf("%s/%s", pfe.resource.Type, pfe.resource.Name), fmt.Sprintf("%d:%d", pfe.localPort, pfe.resource.Port), "--namespace", pfe.resource.Namespace)
pfe.logBuffer = &bytes.Buffer{}
cmd.Stdout = pfe.logBuffer
cmd.Stderr = pfe.logBuffer

if err := cmd.Start(); err != nil {
if errors.Cause(err) == context.Canceled {
return nil
}
return errors.Wrapf(err, "port forwarding %s/%s, port: %d to local port: %d, err: %s", pfe.resource.Type, pfe.resource.Name, pfe.resource.Port, pfe.localPort, buf.String())
return errors.Wrapf(err, "port forwarding %s/%s, port: %d to local port: %d, err: %s", pfe.resource.Type, pfe.resource.Name, pfe.resource.Port, pfe.localPort, pfe.logBuffer.String())
}

resultChan := make(chan error, 1)
go func() {
err := cmd.Wait()
if err != nil {
logrus.Debugf("port forwarding %v terminated: %s, output: %s", pfe, err, buf.String())
logrus.Debugf("port forwarding %v terminated: %s, output: %s", pfe, err, pfe.logBuffer.String())
resultChan <- err
}
}()
Expand Down Expand Up @@ -96,3 +96,22 @@ func (*KubectlForwarder) Terminate(p *portForwardEntry) {
p.cancel()
}
}

// Monitor monitors the logs for a kubectl port forward command
// If it sees an error, it calls back to the EntryManager to
// retry the entire port forward operation.
func (*KubectlForwarder) Monitor(p *portForwardEntry, retryFunc func()) {
for {
time.Sleep(1 * time.Second)
s, _ := p.logBuffer.ReadString(byte('\n'))
if s != "" {
logrus.Tracef("[port-forward] %s", s)
if strings.Contains(s, "error forwarding port") {
// kubectl is having an error. retry the command
logrus.Infof("retrying kubectl port-forward due to error: %s", s)
go retryFunc()
return
}
}
}
}
3 changes: 3 additions & 0 deletions pkg/skaffold/kubernetes/portforward/port_forward_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package portforward

import (
"bytes"
"context"
"fmt"

Expand All @@ -32,6 +33,8 @@ type portForwardEntry struct {
localPort int
automaticPodForwarding bool

logBuffer *bytes.Buffer

cancel context.CancelFunc
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func (f *testForwarder) Forward(ctx context.Context, pfe *portForwardEntry) erro
return f.forwardErr
}

func (f *testForwarder) Monitor(_ *portForwardEntry, _ func()) {}

func (f *testForwarder) Terminate(pfe *portForwardEntry) {
f.forwardedResources.Delete(pfe.key())
f.forwardedPorts.Delete(pfe.resource.Port)
Expand Down

0 comments on commit 72c1f26

Please sign in to comment.