diff --git a/go.mod b/go.mod index 90cda333e0a..67d329eb50c 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/gorilla/mux v1.6.2 // indirect github.com/grpc-ecosystem/grpc-gateway v1.8.5 github.com/hinshun/vt10x v0.0.0-20180809195222-d55458df857c // indirect - github.com/imdario/mergo v0.3.6 // indirect + github.com/imdario/mergo v0.3.6 github.com/karrick/godirwalk v1.7.5 github.com/knative/pkg v0.0.0-20190730155243-972acd413fb9 // indirect github.com/krishicks/yaml-patch v0.0.10 diff --git a/pkg/skaffold/kubernetes/portforward/entry_manager.go b/pkg/skaffold/kubernetes/portforward/entry_manager.go index 6338c08bd0c..cb8f6d2a518 100644 --- a/pkg/skaffold/kubernetes/portforward/entry_manager.go +++ b/pkg/skaffold/kubernetes/portforward/entry_manager.go @@ -156,7 +156,7 @@ func NewEntryManager(out io.Writer, cli *kubectl.CLI) EntryManager { output: out, forwardedPorts: newForwardedPorts(), forwardedResources: newForwardedResources(), - EntryForwarder: &KubectlForwarder{kubectl: cli, out: out}, + EntryForwarder: NewKubectlForwarder(out, cli), } } diff --git a/pkg/skaffold/kubernetes/portforward/entry_manager_test.go b/pkg/skaffold/kubernetes/portforward/entry_manager_test.go index 8c78ecb5dd1..ed25d5e49a3 100644 --- a/pkg/skaffold/kubernetes/portforward/entry_manager_test.go +++ b/pkg/skaffold/kubernetes/portforward/entry_manager_test.go @@ -39,7 +39,7 @@ func TestNewEntryManager(t *testing.T) { output: out, forwardedPorts: newForwardedPorts(), forwardedResources: newForwardedResources(), - EntryForwarder: &KubectlForwarder{kubectl: cli, out: out}, + EntryForwarder: NewKubectlForwarder(out, cli), } actual := NewEntryManager(out, cli) if !reflect.DeepEqual(expected, actual) { diff --git a/pkg/skaffold/kubernetes/portforward/kubectl_forwarder.go b/pkg/skaffold/kubernetes/portforward/kubectl_forwarder.go index f6cb678aa82..1722fa8bbad 100644 --- a/pkg/skaffold/kubernetes/portforward/kubectl_forwarder.go +++ b/pkg/skaffold/kubernetes/portforward/kubectl_forwarder.go @@ -44,6 +44,21 @@ type KubectlForwarder struct { out io.Writer } +// NewKubectlForwarder returns a new KubectlForwarder +func NewKubectlForwarder(out io.Writer, cli *kubectl.CLI) *KubectlForwarder { + return &KubectlForwarder{ + out: out, + kubectl: cli, + } +} + +var ( + // For testing + isPortFree = util.IsPortFree + portForwardCmd = portForwardCommand + deferFunc = func() {} +) + // Forward port-forwards a pod using kubectl port-forward in the background // It kills the command on errors in the kubectl port-forward log // It restarts the command if it was not cancelled by skaffold @@ -54,6 +69,8 @@ func (k *KubectlForwarder) Forward(parentCtx context.Context, pfe *portForwardEn func (k *KubectlForwarder) forward(parentCtx context.Context, pfe *portForwardEntry) { var notifiedUser bool + defer deferFunc() + for { pfe.terminationLock.Lock() if pfe.terminated { @@ -63,7 +80,7 @@ func (k *KubectlForwarder) forward(parentCtx context.Context, pfe *portForwardEn } pfe.terminationLock.Unlock() - if !util.IsPortFree(pfe.localPort) { + if !isPortFree(pfe.localPort) { //assuming that Skaffold brokered ports don't overlap, this has to be an external process that started //since the dev loop kicked off. We are notifying the user in the hope that they can fix it color.Red.Fprintf(k.out, "failed to port forward %v, port %d is taken, retrying...\n", pfe, pfe.localPort) @@ -80,16 +97,8 @@ func (k *KubectlForwarder) forward(parentCtx context.Context, pfe *portForwardEn ctx, cancel := context.WithCancel(parentCtx) pfe.cancel = cancel - cmd := k.kubectl.Command(ctx, - "port-forward", - "--pod-running-timeout", "1s", - fmt.Sprintf("%s/%s", pfe.resource.Type, pfe.resource.Name), - fmt.Sprintf("%d:%d", pfe.localPort, pfe.resource.Port), - "--namespace", pfe.resource.Namespace, - ) buf := &bytes.Buffer{} - cmd.Stdout = buf - cmd.Stderr = buf + cmd := portForwardCmd(ctx, k.kubectl, pfe, buf) if err := cmd.Start(); err != nil { if ctx.Err() == context.Canceled { @@ -117,6 +126,19 @@ func (k *KubectlForwarder) forward(parentCtx context.Context, pfe *portForwardEn } } +func portForwardCommand(ctx context.Context, k *kubectl.CLI, pfe *portForwardEntry, buf io.Writer) *exec.Cmd { + cmd := k.Command(ctx, + "port-forward", + "--pod-running-timeout", "1s", + fmt.Sprintf("%s/%s", pfe.resource.Type, pfe.resource.Name), + fmt.Sprintf("%d:%d", pfe.localPort, pfe.resource.Port), + "--namespace", pfe.resource.Namespace, + ) + cmd.Stdout = buf + cmd.Stderr = buf + return cmd +} + // Terminate terminates an existing kubectl port-forward command using SIGTERM func (*KubectlForwarder) Terminate(p *portForwardEntry) { logrus.Debugf("Terminating port-forward %v", p) diff --git a/pkg/skaffold/kubernetes/portforward/kubectl_forwarder_test.go b/pkg/skaffold/kubernetes/portforward/kubectl_forwarder_test.go new file mode 100644 index 00000000000..847dbe0c200 --- /dev/null +++ b/pkg/skaffold/kubernetes/portforward/kubectl_forwarder_test.go @@ -0,0 +1,164 @@ +/* +Copyright 2019 The Skaffold Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package portforward + +import ( + "bytes" + "context" + "os/exec" + "strings" + "sync" + "testing" + "time" + + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest" +) + +func TestUnavailablePort(t *testing.T) { + + original := isPortFree + defer func() { isPortFree = original }() + + // Return that the port is false, while also + // adding a sync group so we know when isPortFree + // has been called + portFreeWG := &sync.WaitGroup{} + portFreeWG.Add(1) + isPortFree = func(_ int) bool { + defer portFreeWG.Done() + return false + } + + // Create a wait group that will only be + // fulfilled when the forward function returns + forwardFunctionWG := &sync.WaitGroup{} + forwardFunctionWG.Add(1) + originalDefer := deferFunc + defer func() { deferFunc = originalDefer }() + deferFunc = func() { defer forwardFunctionWG.Done() } + + buf := bytes.NewBuffer([]byte{}) + k := KubectlForwarder{ + out: buf, + } + pfe := newPortForwardEntry(0, latest.PortForwardResource{}, "", "", "", 8080, false) + k.Forward(context.Background(), pfe) + + // wait for isPortFree to be called + portFreeWG.Wait() + + // then, end port forwarding and wait for the forward function to return. + pfe.terminationLock.Lock() + pfe.terminated = true + pfe.terminationLock.Unlock() + forwardFunctionWG.Wait() + + // read output to make sure logs are expected + output := buf.String() + if !strings.Contains(output, "port 8080 is taken") { + t.Fatalf("port wasn't available but didn't get warning, got: \n%s", output) + } +} + +func TestTerminate(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + pfe := newPortForwardEntry(0, latest.PortForwardResource{}, "", "", "", 8080, false) + pfe.cancel = cancel + + k := &KubectlForwarder{} + k.Terminate(pfe) + if pfe.terminated != true { + t.Fatalf("expected pfe.terminated to be true after termination") + } + if ctx.Err() != context.Canceled { + t.Fatalf("expected cancel to be called") + } +} + +func TestMonitorErrorLogs(t *testing.T) { + tests := []struct { + description string + input string + cmdRunning bool + }{ + { + description: "no error logs appear", + input: "some random logs", + cmdRunning: true, + }, { + description: "match on 'error forwarding port'", + input: "error forwarding port 8080", + }, { + description: "match on 'unable to forward'", + input: "unable to forward 8080", + }, { + description: "match on 'error upgrading connection'", + input: "error upgrading connection 8080", + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + cmd := exec.Command("sleep", "5") + if err := cmd.Start(); err != nil { + t.Fatal("error starting command") + } + + wg := &sync.WaitGroup{} + wg.Add(1) + + go func() { + defer wg.Done() + k := KubectlForwarder{} + logs := bytes.NewBuffer([]byte(test.input)) + k.monitorErrorLogs(ctx, logs, cmd, &portForwardEntry{}) + }() + + // need to sleep for one second before cancelling the context + // because there is a one second sleep in the switch statement + // of monitorLogs + time.Sleep(1 * time.Second) + + // cancel the context and then wait for monitorErrorLogs to return + cancel() + wg.Wait() + + // make sure the command is running or killed based on what's expected + if test.cmdRunning { + assertCmdIsRunning(t, cmd) + cmd.Process.Kill() + } else { + assertCmdWasKilled(t, cmd) + } + }) + } +} + +func assertCmdIsRunning(t *testing.T, cmd *exec.Cmd) { + if cmd.ProcessState != nil { + t.Fatal("cmd was killed but expected to continue running") + } +} + +func assertCmdWasKilled(t *testing.T, cmd *exec.Cmd) { + if err := cmd.Wait(); err == nil { + t.Fatal("cmd was not killed but expected to be killed") + } +}