Skip to content

Commit

Permalink
Merge pull request #2661 from priyawadhwa/kubectltest
Browse files Browse the repository at this point in the history
Add unit tests to kubectl forwarder
  • Loading branch information
priyawadhwa authored Aug 20, 2019
2 parents 56264fe + 692dd80 commit 27db54b
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 13 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/gorilla/mux v1.6.2 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.8.5
github.com/hinshun/vt10x v0.0.0-20180809195222-d55458df857c // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/imdario/mergo v0.3.6
github.com/karrick/godirwalk v1.7.5
github.com/knative/pkg v0.0.0-20190730155243-972acd413fb9 // indirect
github.com/krishicks/yaml-patch v0.0.10
Expand Down
2 changes: 1 addition & 1 deletion pkg/skaffold/kubernetes/portforward/entry_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func NewEntryManager(out io.Writer, cli *kubectl.CLI) EntryManager {
output: out,
forwardedPorts: newForwardedPorts(),
forwardedResources: newForwardedResources(),
EntryForwarder: &KubectlForwarder{kubectl: cli, out: out},
EntryForwarder: NewKubectlForwarder(out, cli),
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/skaffold/kubernetes/portforward/entry_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestNewEntryManager(t *testing.T) {
output: out,
forwardedPorts: newForwardedPorts(),
forwardedResources: newForwardedResources(),
EntryForwarder: &KubectlForwarder{kubectl: cli, out: out},
EntryForwarder: NewKubectlForwarder(out, cli),
}
actual := NewEntryManager(out, cli)
if !reflect.DeepEqual(expected, actual) {
Expand Down
42 changes: 32 additions & 10 deletions pkg/skaffold/kubernetes/portforward/kubectl_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ type KubectlForwarder struct {
out io.Writer
}

// NewKubectlForwarder returns a new KubectlForwarder
func NewKubectlForwarder(out io.Writer, cli *kubectl.CLI) *KubectlForwarder {
return &KubectlForwarder{
out: out,
kubectl: cli,
}
}

var (
// For testing
isPortFree = util.IsPortFree
portForwardCmd = portForwardCommand
deferFunc = func() {}
)

// Forward port-forwards a pod using kubectl port-forward in the background
// It kills the command on errors in the kubectl port-forward log
// It restarts the command if it was not cancelled by skaffold
Expand All @@ -54,6 +69,8 @@ func (k *KubectlForwarder) Forward(parentCtx context.Context, pfe *portForwardEn

func (k *KubectlForwarder) forward(parentCtx context.Context, pfe *portForwardEntry) {
var notifiedUser bool
defer deferFunc()

for {
pfe.terminationLock.Lock()
if pfe.terminated {
Expand All @@ -63,7 +80,7 @@ func (k *KubectlForwarder) forward(parentCtx context.Context, pfe *portForwardEn
}
pfe.terminationLock.Unlock()

if !util.IsPortFree(pfe.localPort) {
if !isPortFree(pfe.localPort) {
//assuming that Skaffold brokered ports don't overlap, this has to be an external process that started
//since the dev loop kicked off. We are notifying the user in the hope that they can fix it
color.Red.Fprintf(k.out, "failed to port forward %v, port %d is taken, retrying...\n", pfe, pfe.localPort)
Expand All @@ -80,16 +97,8 @@ func (k *KubectlForwarder) forward(parentCtx context.Context, pfe *portForwardEn
ctx, cancel := context.WithCancel(parentCtx)
pfe.cancel = cancel

cmd := k.kubectl.Command(ctx,
"port-forward",
"--pod-running-timeout", "1s",
fmt.Sprintf("%s/%s", pfe.resource.Type, pfe.resource.Name),
fmt.Sprintf("%d:%d", pfe.localPort, pfe.resource.Port),
"--namespace", pfe.resource.Namespace,
)
buf := &bytes.Buffer{}
cmd.Stdout = buf
cmd.Stderr = buf
cmd := portForwardCmd(ctx, k.kubectl, pfe, buf)

if err := cmd.Start(); err != nil {
if ctx.Err() == context.Canceled {
Expand Down Expand Up @@ -117,6 +126,19 @@ func (k *KubectlForwarder) forward(parentCtx context.Context, pfe *portForwardEn
}
}

func portForwardCommand(ctx context.Context, k *kubectl.CLI, pfe *portForwardEntry, buf io.Writer) *exec.Cmd {
cmd := k.Command(ctx,
"port-forward",
"--pod-running-timeout", "1s",
fmt.Sprintf("%s/%s", pfe.resource.Type, pfe.resource.Name),
fmt.Sprintf("%d:%d", pfe.localPort, pfe.resource.Port),
"--namespace", pfe.resource.Namespace,
)
cmd.Stdout = buf
cmd.Stderr = buf
return cmd
}

// Terminate terminates an existing kubectl port-forward command using SIGTERM
func (*KubectlForwarder) Terminate(p *portForwardEntry) {
logrus.Debugf("Terminating port-forward %v", p)
Expand Down
164 changes: 164 additions & 0 deletions pkg/skaffold/kubernetes/portforward/kubectl_forwarder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
Copyright 2019 The Skaffold Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package portforward

import (
"bytes"
"context"
"os/exec"
"strings"
"sync"
"testing"
"time"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"
)

func TestUnavailablePort(t *testing.T) {

original := isPortFree
defer func() { isPortFree = original }()

// Return that the port is false, while also
// adding a sync group so we know when isPortFree
// has been called
portFreeWG := &sync.WaitGroup{}
portFreeWG.Add(1)
isPortFree = func(_ int) bool {
defer portFreeWG.Done()
return false
}

// Create a wait group that will only be
// fulfilled when the forward function returns
forwardFunctionWG := &sync.WaitGroup{}
forwardFunctionWG.Add(1)
originalDefer := deferFunc
defer func() { deferFunc = originalDefer }()
deferFunc = func() { defer forwardFunctionWG.Done() }

buf := bytes.NewBuffer([]byte{})
k := KubectlForwarder{
out: buf,
}
pfe := newPortForwardEntry(0, latest.PortForwardResource{}, "", "", "", 8080, false)
k.Forward(context.Background(), pfe)

// wait for isPortFree to be called
portFreeWG.Wait()

// then, end port forwarding and wait for the forward function to return.
pfe.terminationLock.Lock()
pfe.terminated = true
pfe.terminationLock.Unlock()
forwardFunctionWG.Wait()

// read output to make sure logs are expected
output := buf.String()
if !strings.Contains(output, "port 8080 is taken") {
t.Fatalf("port wasn't available but didn't get warning, got: \n%s", output)
}
}

func TestTerminate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

pfe := newPortForwardEntry(0, latest.PortForwardResource{}, "", "", "", 8080, false)
pfe.cancel = cancel

k := &KubectlForwarder{}
k.Terminate(pfe)
if pfe.terminated != true {
t.Fatalf("expected pfe.terminated to be true after termination")
}
if ctx.Err() != context.Canceled {
t.Fatalf("expected cancel to be called")
}
}

func TestMonitorErrorLogs(t *testing.T) {
tests := []struct {
description string
input string
cmdRunning bool
}{
{
description: "no error logs appear",
input: "some random logs",
cmdRunning: true,
}, {
description: "match on 'error forwarding port'",
input: "error forwarding port 8080",
}, {
description: "match on 'unable to forward'",
input: "unable to forward 8080",
}, {
description: "match on 'error upgrading connection'",
input: "error upgrading connection 8080",
},
}

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

cmd := exec.Command("sleep", "5")
if err := cmd.Start(); err != nil {
t.Fatal("error starting command")
}

wg := &sync.WaitGroup{}
wg.Add(1)

go func() {
defer wg.Done()
k := KubectlForwarder{}
logs := bytes.NewBuffer([]byte(test.input))
k.monitorErrorLogs(ctx, logs, cmd, &portForwardEntry{})
}()

// need to sleep for one second before cancelling the context
// because there is a one second sleep in the switch statement
// of monitorLogs
time.Sleep(1 * time.Second)

// cancel the context and then wait for monitorErrorLogs to return
cancel()
wg.Wait()

// make sure the command is running or killed based on what's expected
if test.cmdRunning {
assertCmdIsRunning(t, cmd)
cmd.Process.Kill()
} else {
assertCmdWasKilled(t, cmd)
}
})
}
}

func assertCmdIsRunning(t *testing.T, cmd *exec.Cmd) {
if cmd.ProcessState != nil {
t.Fatal("cmd was killed but expected to continue running")
}
}

func assertCmdWasKilled(t *testing.T, cmd *exec.Cmd) {
if err := cmd.Wait(); err == nil {
t.Fatal("cmd was not killed but expected to be killed")
}
}

0 comments on commit 27db54b

Please sign in to comment.