Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kube Terminal: Appears to hang if it takes a while to start up #506

Merged
merged 11 commits into from
Sep 22, 2020
19 changes: 15 additions & 4 deletions src/jetstream/plugins/kubernetes/terminal/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
)

const (
helmEndpointType = "helm"
helmRepoEndpointType = "repo"
helmEndpointType = "helm"
helmRepoEndpointType = "repo"
startingProgressMessage = "Waiting for Kubernetes Terminal to start up ..."
)

// PodCreationData stores the clients and names used to create pod and secret
Expand Down Expand Up @@ -87,6 +88,8 @@ func (k *KubeTerminal) createPod(c echo.Context, kubeConfig, kubeVersion string,
Type: "Opaque",
}

sendProgressMessage(ws, startingProgressMessage)

setResourcMetadata(&secretSpec.ObjectMeta, sessionID)

secretSpec.Data = make(map[string][]byte)
Expand All @@ -98,6 +101,7 @@ func (k *KubeTerminal) createPod(c echo.Context, kubeConfig, kubeVersion string,
secretSpec.Data["helm-setup"] = []byte(helmSetup)
}

sendProgressMessage(ws, startingProgressMessage)
_, err = secretClient.Create(secretSpec)
if err != nil {
log.Warnf("Kubernetes Terminal: Unable to create Secret: %+v", err)
Expand Down Expand Up @@ -154,6 +158,8 @@ func (k *KubeTerminal) createPod(c echo.Context, kubeConfig, kubeVersion string,
}
podSpec.Spec.Volumes = volumesSpec

sendProgressMessage(ws, startingProgressMessage)

// Create a new pod
pod, err := podClient.Create(podSpec)
if err != nil {
Expand All @@ -165,12 +171,12 @@ func (k *KubeTerminal) createPod(c echo.Context, kubeConfig, kubeVersion string,
result.PodClient = podClient
result.PodName = podName

sendProgressMessage(ws, "Waiting for Kubernetes Terminal to start up ...")

// Wait for the pod to be running
timeout := 60
statusOptions := metav1.GetOptions{}
for {
// This ensures we keep the web socket alive while the container is creating
sendProgressMessage(ws, startingProgressMessage)
status, err := podClient.Get(pod.Name, statusOptions)
if err == nil && status.Status.Phase == "Running" {
break
Expand Down Expand Up @@ -201,6 +207,11 @@ func setResourcMetadata(metadata *metav1.ObjectMeta, sessionID string) {

// Cleanup the pod and secret
func (k *KubeTerminal) cleanupPodAndSecret(podData *PodCreationData) error {
if podData == nil {
// Already been cleaned up
return nil
}

if len(podData.PodName) > 0 {
//captureBashHistory(podData)
podData.PodClient.Delete(podData.PodName, nil)
Expand Down
68 changes: 47 additions & 21 deletions src/jetstream/plugins/kubernetes/terminal/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ type terminalSize struct {
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second

// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second

// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10

// Time to wait before force close on connection.
closeGracePeriod = 10 * time.Second
)

// Start handles web-socket request to launch a Kubernetes Terminal
Expand All @@ -60,7 +69,7 @@ func (k *KubeTerminal) Start(c echo.Context) error {
if !ok {
return errors.New("Could not get token")
}

// This is the kube config for the kubernetes endpoint that we want configured in the Terminal
kubeConfig, err := k.Kube.GetKubeConfigForEndpoint(cnsiRecord.APIEndpoint.String(), tokenRecord, "")
if err != nil {
Expand All @@ -79,7 +88,10 @@ func (k *KubeTerminal) Start(c echo.Context) error {
defer ws.Close()
defer pingTicker.Stop()

// We are now in web socket land - we don't want any middleware to change the HTTP response
// At this point we aer using web sockets, so we can not return errors to the client as the connection
// has been upgraded to a web socket

// We are now in web socket land - we don't want any middleware to change the HTTP response
c.Set("Stratos-WebSocket", "true")

// Send a message to say that we are creating the pod
Expand All @@ -95,8 +107,8 @@ func (k *KubeTerminal) Start(c echo.Context) error {
k.cleanupPodAndSecret(podData)

// Send error message
sendProgressMessage(ws, "!" + err.Error())
return err
sendProgressMessage(ws, "!"+err.Error())
return nil
}

// API Endpoint to SSH/exec into a container
Expand Down Expand Up @@ -131,36 +143,40 @@ func (k *KubeTerminal) Start(c echo.Context) error {

stdoutDone := make(chan bool)
go pumpStdout(ws, wsConn, stdoutDone)
go ping(ws, stdoutDone)

// If the downstream connection is closed, close the other web socket as well
ws.SetCloseHandler(func (code int, text string) error {
ws.SetCloseHandler(func(code int, text string) error {
wsConn.Close()
// Cleanup
k.cleanupPodAndSecret(podData)
podData = nil
return nil
})

// Wait a while when reading - can take some time for the container to launch
ws.SetReadDeadline(time.Now().Add(pongWait))
ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })

// Read the input from the web socket and pipe it to the SSH client
for {
_, r, err := ws.ReadMessage()
if err != nil {
// Check to see if this was because the web socket was closed cleanly
closed := false
select {
case msg := <-stdoutDone:
closed = msg
}
if !closed {
log.Errorf("Kubernetes terminal: error reading message from web socket: %+v", err)
}
log.Debug("Kube Terminal cleaning up ....")
// Error reading - so clean up
k.cleanupPodAndSecret(podData)
podData = nil

ws.SetWriteDeadline(time.Now().Add(writeWait))
ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
time.Sleep(closeGracePeriod)
ws.Close()

// No point returning an error - we've already upgraded to web sockets, so we can't use the HTTP response now
return nil
}

res := KeyCode{}
json.Unmarshal(r, &res)

if res.Cols == 0 {
slice := make([]byte, 1)
slice[0] = 0
Expand All @@ -177,11 +193,6 @@ func (k *KubeTerminal) Start(c echo.Context) error {
wsConn.WriteMessage(websocket.TextMessage, slice)
}
}

// Cleanup
log.Error("Kubernetes Terminal is cleaning up")

return k.cleanupPodAndSecret(podData)
}

func pumpStdout(ws *websocket.Conn, source *websocket.Conn, done chan bool) {
Expand All @@ -202,3 +213,18 @@ func pumpStdout(ws *websocket.Conn, source *websocket.Conn, done chan bool) {
}
}
}

func ping(ws *websocket.Conn, done chan bool) {
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)); err != nil {
log.Errorf("Web socket ping error: %+v", err)
}
case <-done:
return
}
}
}