diff --git a/src/frontend/packages/core/src/shared/components/ssh-viewer/ssh-viewer.component.ts b/src/frontend/packages/core/src/shared/components/ssh-viewer/ssh-viewer.component.ts
index b2162e00b7..8d9cedccb8 100644
--- a/src/frontend/packages/core/src/shared/components/ssh-viewer/ssh-viewer.component.ts
+++ b/src/frontend/packages/core/src/shared/components/ssh-viewer/ssh-viewer.component.ts
@@ -29,6 +29,8 @@ export class SshViewerComponent implements OnInit, OnDestroy {
public isConnecting = false;
private isDestroying = false;
+ public message = '';
+
@ViewChild('terminal', { static: true }) container: ElementRef;
private xterm: Terminal;
@@ -66,7 +68,6 @@ export class SshViewerComponent implements OnInit, OnDestroy {
this.xterm = new Terminal();
this.xterm.loadAddon(this.xtermFitAddon);
this.xterm.open(this.container.nativeElement);
- // this.xtermFitAddon.fit();
this.resize();
this.xterm.onKey(e => {
@@ -115,8 +116,15 @@ export class SshViewerComponent implements OnInit, OnDestroy {
this.msgSubscription = this.sshStream
.subscribe(
(data: string) => {
- for (const c of data.split(' ')) {
- this.xterm.write(String.fromCharCode(parseInt(c, 16)));
+ // Check for a window title message
+ if (!this.isWindowTitle(data)) {
+ for (const c of data.split(' ')) {
+ this.xterm.write(String.fromCharCode(parseInt(c, 16)));
+ }
+ } else {
+ console.log('Error')
+ const eMsg = this.errorMessage;
+ this.errorMessage = eMsg;
}
},
(err) => {
@@ -130,4 +138,24 @@ export class SshViewerComponent implements OnInit, OnDestroy {
}
);
}
+ private isWindowTitle(data: string): boolean {
+ const chars = data.split(' ');
+ if (chars.length > 4 &&
+ parseInt(chars[0], 16) === 27 &&
+ parseInt(chars[1], 16) === 93 &&
+ parseInt(chars[2], 16) === 50 &&
+ parseInt(chars[3], 16) === 59) {
+ let title = '';
+ for (let i = 4; i < chars.length - 1; i++) {
+ title += String.fromCharCode(parseInt(chars[i], 16));
+ }
+ if (title.length > 0 && title.charAt(0) === '!') {
+ this.errorMessage = title.substr(1);
+ console.log(this.errorMessage);
+ return true;
+ }
+ this.message = title;
+ }
+ return false;
+ }
}
diff --git a/src/jetstream/main.go b/src/jetstream/main.go
index 9b649da31d..f6f90c60c6 100644
--- a/src/jetstream/main.go
+++ b/src/jetstream/main.go
@@ -10,6 +10,7 @@ import (
"fmt"
"io"
"io/ioutil"
+ "math/rand"
"net"
"net/http"
"os"
@@ -115,6 +116,8 @@ func main() {
}
}
+ rand.Seed(time.Now().UnixNano())
+
log.SetOutput(os.Stdout)
log.Info("========================================")
diff --git a/src/jetstream/plugins/kubernetes/api/api.go b/src/jetstream/plugins/kubernetes/api/api.go
new file mode 100644
index 0000000000..b15fc1b3c3
--- /dev/null
+++ b/src/jetstream/plugins/kubernetes/api/api.go
@@ -0,0 +1,12 @@
+package api
+
+import (
+ "github.com/cloudfoundry-incubator/stratos/src/jetstream/repository/interfaces"
+
+ restclient "k8s.io/client-go/rest"
+)
+
+type Kubernetes interface {
+ GetConfigForEndpoint(masterURL string, token interfaces.TokenRecord) (*restclient.Config, error)
+ GetKubeConfigForEndpoint(masterURL string, token interfaces.TokenRecord, namespace string) (string, error)
+}
diff --git a/src/jetstream/plugins/kubernetes/go.mod b/src/jetstream/plugins/kubernetes/go.mod
index 4e384f1314..288ec1d075 100644
--- a/src/jetstream/plugins/kubernetes/go.mod
+++ b/src/jetstream/plugins/kubernetes/go.mod
@@ -19,6 +19,7 @@ require (
github.com/kubernetes-sigs/aws-iam-authenticator v0.3.0
github.com/labstack/echo v3.3.10+incompatible
github.com/russross/blackfriday v2.0.0+incompatible // indirect
+ github.com/satori/go.uuid v1.2.0 // indirect
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
github.com/sirupsen/logrus v1.4.2
github.com/smartystreets/goconvey v1.6.4
diff --git a/src/jetstream/plugins/kubernetes/go.sum b/src/jetstream/plugins/kubernetes/go.sum
index 4af88b969f..d3b2e14ea0 100644
--- a/src/jetstream/plugins/kubernetes/go.sum
+++ b/src/jetstream/plugins/kubernetes/go.sum
@@ -496,6 +496,9 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR
github.com/russross/blackfriday v2.0.0 h1:L7Oc72h7rDqGkbUorN/ncJ4N/y220/YRezHvBoKLOFA=
github.com/russross/blackfriday v2.0.0+incompatible/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday v2.0.0/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
+github.com/russross/blackfriday v2.0.0+incompatible/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
+github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
+github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
diff --git a/src/jetstream/plugins/kubernetes/main.go b/src/jetstream/plugins/kubernetes/main.go
index 0ed9278022..2cee120fcc 100644
--- a/src/jetstream/plugins/kubernetes/main.go
+++ b/src/jetstream/plugins/kubernetes/main.go
@@ -14,12 +14,15 @@ import (
log "github.com/sirupsen/logrus"
"github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/kubernetes/auth"
+
+ "github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/kubernetes/terminal"
)
// KubernetesSpecification is the endpoint that adds Kubernetes support to the backend
type KubernetesSpecification struct {
portalProxy interfaces.PortalProxy
endpointType string
+ kubeTerminal *terminal.KubeTerminal
}
type KubeStatus struct {
@@ -49,12 +52,20 @@ const (
kubeEndpointType = "k8s"
defaultKubeClientID = "K8S_CLIENT"
- // kubeDashboardPluginConfigSetting is config value send back to the client to indicate if the kube dashboard can be navigated to
+ // kubeDashboardPluginConfigSetting is config value sent back to the client to indicate if the kube dashboard ie enabled
kubeDashboardPluginConfigSetting = "kubeDashboardEnabled"
+ // kubeTerminalPluginConfigSetting is config value sent back to the client to indicate if the kube terminal is enabled
+ kubeTerminalPluginConfigSetting = "kubeTerminalEnabled"
)
+// Init creates a new instance of the Kubernetes plugin
func Init(portalProxy interfaces.PortalProxy) (interfaces.StratosPlugin, error) {
- return &KubernetesSpecification{portalProxy: portalProxy, endpointType: kubeEndpointType}, nil
+ kubeTerminal := terminal.NewKubeTerminal(portalProxy)
+ kube := &KubernetesSpecification{portalProxy: portalProxy, endpointType: kubeEndpointType, kubeTerminal: kubeTerminal}
+ if kubeTerminal != nil {
+ kubeTerminal.Kube = kube
+ }
+ return kube, nil
}
func (c *KubernetesSpecification) GetEndpointPlugin() (interfaces.EndpointPlugin, error) {
@@ -131,6 +142,14 @@ func (c *KubernetesSpecification) Init() error {
// Kube dashboard is enabled by Tech Preview mode
c.portalProxy.GetConfig().PluginConfig[kubeDashboardPluginConfigSetting] = strconv.FormatBool(c.portalProxy.GetConfig().EnableTechPreview)
+ // Kube terminal is enabled by Tech Preview mode
+ c.portalProxy.GetConfig().PluginConfig[kubeTerminalPluginConfigSetting] = strconv.FormatBool(c.portalProxy.GetConfig().EnableTechPreview)
+
+ // Kick off the cleanup of any old kube terminal pods
+ if c.kubeTerminal != nil {
+ c.kubeTerminal.StartCleanup()
+ }
+
return nil
}
@@ -159,6 +178,10 @@ func (c *KubernetesSpecification) AddSessionGroupRoutes(echoGroup *echo.Group) {
echoGroup.GET("/helm/releases/:endpoint/:namespace/:name/status", c.GetReleaseStatus)
echoGroup.GET("/helm/releases/:endpoint/:namespace/:name", c.GetRelease)
+ // Kube Terminal
+ if c.kubeTerminal != nil {
+ echoGroup.GET("/kubeterminal/:guid", c.kubeTerminal.Start)
+ }
}
func (c *KubernetesSpecification) Info(apiEndpoint string, skipSSLValidation bool) (interfaces.CNSIRecord, interface{}, error) {
diff --git a/src/jetstream/plugins/kubernetes/terminal/cleanup.go b/src/jetstream/plugins/kubernetes/terminal/cleanup.go
new file mode 100644
index 0000000000..621921e1e5
--- /dev/null
+++ b/src/jetstream/plugins/kubernetes/terminal/cleanup.go
@@ -0,0 +1,83 @@
+package terminal
+
+import (
+ "math/rand"
+ "strconv"
+ "time"
+
+ log "github.com/sirupsen/logrus"
+
+ metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+// Wait time in minutes after random intiial wait
+const waitPeriod = 10
+
+// StartCleanup starts a background routine to cleanup orphaned pods
+func (k *KubeTerminal) StartCleanup() {
+ go k.cleanup()
+}
+
+func (k *KubeTerminal) cleanup() {
+ // Use a random initial wait before cleaning up
+ // If we had more than one backend, this helps to ensure they are not all trying to cleanup at the same time
+ wait := rand.Intn(30)
+ log.Debug("Kubernetes Terminal cleanup will start in %d minutes", wait)
+
+ for {
+ time.Sleep(time.Duration(wait) * time.Minute)
+ log.Debug("Cleaning up stale Kubernetes Terminal pods and secrets ...")
+
+ // Get all pods with a given label
+ podClient, secretClient, err := k.getClients()
+ if err == nil {
+ // Only want the pods that are kube terminals
+ options := metaV1.ListOptions{}
+ options.LabelSelector = "stratos-role=kube-terminal"
+ pods, err := podClient.List(options)
+ if err == nil {
+ for _, pod := range pods.Items {
+ if sessionID, ok := pod.Annotations[stratosSessionAnnotation]; ok {
+ i, err := strconv.Atoi(sessionID)
+ if err == nil {
+ isValid, err := k.PortalProxy.GetSessionDataStore().IsValidSession(i)
+ if err == nil && !isValid {
+ log.Debugf("Deleting pod %s", pod.Name)
+ podClient.Delete(pod.Name, nil)
+ }
+ }
+ }
+ }
+ } else {
+ log.Debug("Kube Terminal Cleanup: Could not get pods")
+ log.Debug(err)
+ }
+
+ // Only want the secrets that are kube terminals
+ secrets, err := secretClient.List(options)
+ if err == nil {
+ for _, secret := range secrets.Items {
+ if sessionID, ok := secret.Annotations[stratosSessionAnnotation]; ok {
+ i, err := strconv.Atoi(sessionID)
+ if err == nil {
+ isValid, err := k.PortalProxy.GetSessionDataStore().IsValidSession(i)
+ if err == nil && !isValid {
+ log.Debugf("Deleting secret %s", secret.Name)
+ secretClient.Delete(secret.Name, nil)
+ }
+ }
+ }
+ }
+ } else {
+ log.Warn("Kube Terminal Cleanup: Could not get secrets")
+ log.Warn(err)
+ }
+
+ } else {
+ log.Warn("Kube Terminal Cleanup: Could not get clients")
+ log.Warn(err)
+ }
+
+ wait = waitPeriod
+ }
+}
diff --git a/src/jetstream/plugins/kubernetes/terminal/helpers.go b/src/jetstream/plugins/kubernetes/terminal/helpers.go
new file mode 100644
index 0000000000..b34caf9a9a
--- /dev/null
+++ b/src/jetstream/plugins/kubernetes/terminal/helpers.go
@@ -0,0 +1,266 @@
+package terminal
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "regexp"
+ "strings"
+ "time"
+
+ "github.com/labstack/echo"
+ uuid "github.com/satori/go.uuid"
+ log "github.com/sirupsen/logrus"
+
+ "github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/kubernetes/auth"
+ "github.com/cloudfoundry-incubator/stratos/src/jetstream/repository/interfaces"
+
+ "github.com/gorilla/websocket"
+ v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/kubernetes"
+ corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
+)
+
+// PodCreationData stores the clients and names used to create pod and secret
+type PodCreationData struct {
+ Namespace string
+ PodClient corev1.PodInterface
+ SecretClient corev1.SecretInterface
+ PodName string
+ SecretName string
+}
+
+func (k *KubeTerminal) getClients() (corev1.PodInterface, corev1.SecretInterface, error) {
+
+ // Create a token record for Token Auth using the Service Account token
+ token := auth.NewKubeTokenAuthTokenRecord(k.PortalProxy, string(k.Token))
+ config, err := k.Kube.GetConfigForEndpoint(k.APIServer, *token)
+ if err != nil {
+ return nil, nil, errors.New("Can not get Kubernetes config for specified endpoint")
+ }
+ kubeClient, err := kubernetes.NewForConfig(config)
+ if err != nil {
+ log.Error("Could not get kube client")
+ return nil, nil, err
+ }
+
+ podClient := kubeClient.CoreV1().Pods(k.Namespace)
+ secretsClient := kubeClient.CoreV1().Secrets(k.Namespace)
+ return podClient, secretsClient, nil
+}
+
+// Create a pod for a user to run the Kube terminal
+func (k *KubeTerminal) createPod(c echo.Context, kubeConfig, kubeVersion string, ws *websocket.Conn) (*PodCreationData, error) {
+ // Unique ID for the secret and pod name
+ id := uuid.NewV4().String()
+ id = strings.ReplaceAll(id, "-", "")
+ // Names for the secret and pod
+ secretName := fmt.Sprintf("terminal-%s", id)
+ podName := secretName
+ podClient, secretClient, err := k.getClients()
+ result := &PodCreationData{}
+ result.Namespace = k.Namespace
+
+ // Get the session ID
+ sessionID := ""
+ session, err := k.PortalProxy.GetSession(c)
+ if err == nil {
+ sessionID = session.ID
+ }
+
+ // Create the secret
+ secretSpec := &v1.Secret{
+ TypeMeta: metav1.TypeMeta{
+ Kind: "secret",
+ APIVersion: "v1",
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Name: secretName,
+ Namespace: k.Namespace,
+ },
+ Type: "Opaque",
+ }
+
+ setResourcMetadata(&secretSpec.ObjectMeta, sessionID)
+
+ secretSpec.Data = make(map[string][]byte)
+ secretSpec.Data["kubeconfig"] = []byte(kubeConfig)
+
+ // Get Helm repository script if we have Helm repositories
+ helmSetup := getHelmRepoSetupScript(k.PortalProxy)
+ if len(helmSetup) > 0 {
+ secretSpec.Data["helm-setup"] = []byte(helmSetup)
+ }
+
+ _, err = secretClient.Create(secretSpec)
+ if err != nil {
+ log.Warnf("Kubernetes Terminal: Unable to create Secret: %+v", err)
+ return result, err
+ }
+
+ result.SecretClient = secretClient
+ result.SecretName = secretName
+
+ // Pod
+ podSpec := &v1.Pod{
+ TypeMeta: metav1.TypeMeta{
+ Kind: "pod",
+ APIVersion: "v1",
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Name: podName,
+ Namespace: k.Namespace,
+ },
+ }
+
+ // Label the pod, so we can find it as a kube terminal pod
+ setResourcMetadata(&podSpec.ObjectMeta, sessionID)
+
+ // Don't mount a service account token
+ off := false
+ podSpec.Spec.AutomountServiceAccountToken = &off
+ podSpec.Spec.EnableServiceLinks = &off
+ podSpec.Spec.RestartPolicy = "Never"
+ podSpec.Spec.DNSPolicy = "Default"
+
+ volumeMountsSpec := make([]v1.VolumeMount, 1)
+ volumeMountsSpec[0].Name = "kubeconfig"
+ volumeMountsSpec[0].MountPath = "/home/stratos/.stratos"
+ volumeMountsSpec[0].ReadOnly = true
+
+ containerSpec := make([]v1.Container, 1)
+ containerSpec[0].Name = consoleContainerName
+ containerSpec[0].Image = k.Image
+ containerSpec[0].ImagePullPolicy = "Always"
+ containerSpec[0].VolumeMounts = volumeMountsSpec
+
+ // Add env var for kube version
+ containerSpec[0].Env = make([]v1.EnvVar, 1)
+ containerSpec[0].Env[0].Name = "K8S_VERSION"
+ containerSpec[0].Env[0].Value = kubeVersion
+
+ podSpec.Spec.Containers = containerSpec
+
+ volumesSpec := make([]v1.Volume, 1)
+ volumesSpec[0].Name = "kubeconfig"
+ volumesSpec[0].Secret = &v1.SecretVolumeSource{
+ SecretName: secretName,
+ }
+ podSpec.Spec.Volumes = volumesSpec
+
+ // Create a new pod
+ pod, err := podClient.Create(podSpec)
+ if err != nil {
+ log.Warnf("Kubernetes Terminal: Unable to create Pod: %+v", err)
+ // Secret will get cleaned up by caller
+ return result, err
+ }
+
+ result.PodClient = podClient
+ result.PodName = podName
+
+ sendProgressMessage(ws, "Waiting for Kubernetes Terminal to start up ...")
+
+ // Wait for the pod to be running
+ timeout := 60
+ statusOptions := metav1.GetOptions{}
+ for {
+ status, err := podClient.Get(pod.Name, statusOptions)
+ if err == nil && status.Status.Phase == "Running" {
+ break;
+ }
+
+ timeout = timeout - 1
+ if timeout == 0 {
+ err = errors.New("Timed out waiting for pod to enter ready state")
+ break
+ }
+
+ // Sleep
+ time.Sleep(1500 * time.Millisecond)
+ }
+
+ return result, err
+}
+
+func setResourcMetadata(metadata *metav1.ObjectMeta, sessionID string) {
+ // Label the kubeerntes resource, so we can find it as a kube terminal pod
+ metadata.Labels = make(map[string]string)
+ metadata.Labels[stratosRoleLabel] = stratosKubeTerminalRole
+ metadata.Annotations = make(map[string]string)
+ if len(sessionID) > 0 {
+ metadata.Annotations[stratosSessionAnnotation] = sessionID
+ }
+}
+
+// Cleanup the pod and secret
+func (k *KubeTerminal) cleanupPodAndSecret(podData *PodCreationData) error {
+ if len(podData.PodName) > 0 {
+ //captureBashHistory(podData)
+ podData.PodClient.Delete(podData.PodName, nil)
+ }
+
+ if len(podData.SecretName) > 0 {
+ podData.SecretClient.Delete(podData.SecretName, nil)
+ }
+
+ return nil
+}
+
+func getHelmRepoSetupScript(portalProxy interfaces.PortalProxy) string {
+ str := ""
+
+ // Get all of the helm endpoints
+ endpoints, err := portalProxy.ListEndpoints()
+ if err != nil {
+ log.Error("Can not list Helm Repository endpoints")
+ return str
+ }
+
+ for _, ep := range endpoints {
+ if ep.CNSIType == "helm" {
+ str += fmt.Sprintf("helm repo add %s %s > /dev/null\n", ep.Name, ep.APIEndpoint)
+ }
+ }
+
+ return str
+}
+
+func sendProgressMessage(ws *websocket.Conn, progressMsg string) {
+ // Send a message to say that we are creating the pod
+ msg := fmt.Sprintf("\033]2;%s\007", progressMsg)
+ bytes := fmt.Sprintf("% x\n", []byte(msg))
+ if err := ws.WriteMessage(websocket.TextMessage, []byte(bytes)); err != nil {
+ log.Error("Could not send message to client to indicate terminal is starting")
+ }
+}
+
+func (k *KubeTerminal) getKubeVersion(endpointID, userID string) (string, error) {
+ response, err := k.PortalProxy.DoProxySingleRequest(endpointID, userID, "GET", "/api/v1/nodes", nil, nil)
+ if err != nil || response.StatusCode != 200 {
+ return "", errors.New("Could not fetch node list")
+ }
+
+ var nodes v1.NodeList
+ err = json.Unmarshal(response.Response, &nodes)
+ if err != nil {
+ return "", errors.New("Could not unmarshal node list")
+ }
+
+ if len(nodes.Items) > 0 {
+ // Get the version number - remove any 'v' perfix or '+' suffix
+ version := nodes.Items[0].Status.NodeInfo.KubeletVersion
+ reg, err := regexp.Compile("[^0-9\\.]+")
+ if err == nil {
+ version = reg.ReplaceAllString(version, "")
+ }
+ parts := strings.Split(version, ".")
+ if len(parts) > 1 {
+ v := fmt.Sprintf("%s.%s", parts[0], parts[1])
+ return v, nil
+ }
+ }
+
+ return "", errors.New("Can not get Kubernetes version")
+}
diff --git a/src/jetstream/plugins/kubernetes/terminal/start.go b/src/jetstream/plugins/kubernetes/terminal/start.go
new file mode 100644
index 0000000000..63c90fe519
--- /dev/null
+++ b/src/jetstream/plugins/kubernetes/terminal/start.go
@@ -0,0 +1,204 @@
+package terminal
+
+import (
+ "crypto/tls"
+ "errors"
+ "fmt"
+
+ //"encoding/base64"
+ "encoding/json"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/labstack/echo"
+ log "github.com/sirupsen/logrus"
+
+ "github.com/cloudfoundry-incubator/stratos/src/jetstream/repository/interfaces"
+
+ "github.com/gorilla/websocket"
+)
+
+// TTY Resize, see: https://gitlab.cncf.ci/kubernetes/kubernetes/commit/3b21a9901bcd48bb452d3bf1a0cddc90dae142c4#9691a2f9b9c30711f0397221db0b9ac55ab0e2d1
+
+// Allow connections from any Origin
+var upgrader = websocket.Upgrader{
+ CheckOrigin: func(r *http.Request) bool { return true },
+}
+
+// KeyCode - JSON object that is passed from the front-end to notify of a key press or a term resize
+type KeyCode struct {
+ Key string `json:"key"`
+ Cols int `json:"cols"`
+ Rows int `json:"rows"`
+}
+
+type terminalSize struct {
+ Width uint16
+ Height uint16
+}
+
+const (
+ // Time allowed to write a message to the peer.
+ writeWait = 10 * time.Second
+)
+
+// Start handles web-socket request to launch a Kubernetes Terminal
+func (k *KubeTerminal) Start(c echo.Context) error {
+ log.Debug("Kube Terminal start request")
+
+ endpointGUID := c.Param("guid")
+ userGUID := c.Get("user_id").(string)
+
+ cnsiRecord, err := k.PortalProxy.GetCNSIRecord(endpointGUID)
+ if err != nil {
+ return errors.New("Could not get endpoint information")
+ }
+
+ // Get token for this user
+ tokenRecord, ok := k.PortalProxy.GetCNSITokenRecord(endpointGUID, userGUID)
+ if !ok {
+ return errors.New("Could not get token")
+ }
+
+ // This is the kube config for the kubernetes endpoint that we want configured in the Terminal
+ kubeConfig, err := k.Kube.GetKubeConfigForEndpoint(cnsiRecord.APIEndpoint.String(), tokenRecord, "")
+ if err != nil {
+ return errors.New("Can not get Kubernetes config for specified endpoint")
+ }
+
+ // Determine the Kubernetes version
+ version, _ := k.getKubeVersion(endpointGUID, userGUID)
+ log.Debugf("Kubernetes Version: %s", version)
+
+ // Upgrade the web socket for the incoming request
+ ws, pingTicker, err := interfaces.UpgradeToWebSocket(c)
+ if err != nil {
+ return err
+ }
+ defer ws.Close()
+ defer pingTicker.Stop()
+
+ // We are now in web socket land - we don't want any middleware to change the HTTP response
+ c.Set("Stratos-WebSocket", "true")
+
+ // Send a message to say that we are creating the pod
+ sendProgressMessage(ws, "Launching Kubernetes Terminal ... one moment please")
+
+ podData, err := k.createPod(c, kubeConfig, version, ws)
+
+ // Clear progress message
+ sendProgressMessage(ws, "")
+
+ if err != nil {
+ log.Errorf("Kubernetes Terminal: Error creating secret or pod: %+v", err)
+ k.cleanupPodAndSecret(podData)
+
+ // Send error message
+ sendProgressMessage(ws, "!" + err.Error())
+ return err
+ }
+
+ // API Endpoint to SSH/exec into a container
+ target := fmt.Sprintf("%s/api/v1/namespaces/%s/pods/%s/exec?command=/bin/bash&stdin=true&stderr=true&stdout=true&tty=true", k.APIServer, k.Namespace, podData.PodName)
+
+ dialer := &websocket.Dialer{
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true,
+ },
+ }
+
+ if strings.HasPrefix(target, "https://") {
+ target = "wss://" + target[8:]
+ } else {
+ target = "ws://" + target[7:]
+ }
+
+ header := &http.Header{}
+ header.Add("Authorization", fmt.Sprintf("Bearer %s", string(k.Token)))
+ wsConn, _, err := dialer.Dial(target, *header)
+
+ if err == nil {
+ defer wsConn.Close()
+ }
+
+ if err != nil {
+ k.cleanupPodAndSecret(podData)
+ log.Warn("Kube Terminal: Could not connect to pod")
+ // No point returning an error - we've already upgraded to web sockets, so we can't use the HTTP response now
+ return nil
+ }
+
+ stdoutDone := make(chan bool)
+ go pumpStdout(ws, wsConn, stdoutDone)
+
+ // If the downstream connection is closed, close the other web socket as well
+ ws.SetCloseHandler(func (code int, text string) error {
+ wsConn.Close()
+ return nil
+ })
+
+ // Read the input from the web socket and pipe it to the SSH client
+ for {
+ _, r, err := ws.ReadMessage()
+ if err != nil {
+ // Check to see if this was because the web socket was closed cleanly
+ closed := false
+ select {
+ case msg := <-stdoutDone:
+ closed = msg
+ }
+ if !closed {
+ log.Errorf("Kubernetes terminal: error reading message from web socket: %+v", err)
+ }
+ log.Debug("Kube Terminal cleaning up ....")
+ k.cleanupPodAndSecret(podData)
+
+ // No point returning an error - we've already upgraded to web sockets, so we can't use the HTTP response now
+ return nil
+ }
+
+ res := KeyCode{}
+ json.Unmarshal(r, &res)
+
+ if res.Cols == 0 {
+ slice := make([]byte, 1)
+ slice[0] = 0
+ slice = append(slice, []byte(res.Key)...)
+ wsConn.WriteMessage(websocket.TextMessage, slice)
+ } else {
+ size := terminalSize{
+ Width: uint16(res.Cols),
+ Height: uint16(res.Rows),
+ }
+ j, _ := json.Marshal(size)
+ resizeStream := []byte{4}
+ slice := append(resizeStream, j...)
+ wsConn.WriteMessage(websocket.TextMessage, slice)
+ }
+ }
+
+ // Cleanup
+ log.Error("Kubernetes Terminal is cleaning up")
+
+ return k.cleanupPodAndSecret(podData)
+}
+
+func pumpStdout(ws *websocket.Conn, source *websocket.Conn, done chan bool) {
+ for {
+ _, r, err := source.ReadMessage()
+ if err != nil {
+ // Close
+ ws.Close()
+ done <- true
+ break
+ }
+ ws.SetWriteDeadline(time.Now().Add(writeWait))
+ bytes := fmt.Sprintf("% x\n", r[1:])
+ if err := ws.WriteMessage(websocket.TextMessage, []byte(bytes)); err != nil {
+ log.Errorf("Kubernetes Terminal failed to write message: %+v", err)
+ ws.Close()
+ break
+ }
+ }
+}
diff --git a/src/jetstream/plugins/kubernetes/terminal/terminal.go b/src/jetstream/plugins/kubernetes/terminal/terminal.go
new file mode 100644
index 0000000000..83d259147b
--- /dev/null
+++ b/src/jetstream/plugins/kubernetes/terminal/terminal.go
@@ -0,0 +1,85 @@
+package terminal
+
+import (
+ "fmt"
+ "io/ioutil"
+
+ "github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/kubernetes/api"
+ "github.com/cloudfoundry-incubator/stratos/src/jetstream/repository/interfaces"
+ "github.com/cloudfoundry-incubator/stratos/src/jetstream/repository/interfaces/config"
+
+ log "github.com/sirupsen/logrus"
+)
+
+const (
+ serviceAccountTokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"
+ serviceHostEnvVar = "KUBERNETES_SERVICE_HOST"
+ servicePortEnvVar = "KUBERNETES_SERVICE_PORT"
+ // For dev - read token from env var
+ serviceTokenEnvVar = "KUBE_TERMINAL_SERVICE_ACCOUNT_TOKEN"
+
+ stratosRoleLabel = "stratos-role"
+ stratosKubeTerminalRole = "kube-terminal"
+ stratosSessionAnnotation = "stratos-session"
+
+ consoleContainerName = "kube-terminal"
+)
+
+// KubeTerminal supports spawning pods to provide a CLI environment to the user
+type KubeTerminal struct {
+ PortalProxy interfaces.PortalProxy
+ Namespace string `configName:"STRATOS_KUBERNETES_NAMESPACE"`
+ Image string `configName:"STRATOS_KUBERNETES_TERMINAL_IMAGE"`
+ Token []byte
+ APIServer string
+ Kube api.Kubernetes
+}
+
+// NewKubeTerminal checks that the environment is set up to support the Kube Terminal
+func NewKubeTerminal(p interfaces.PortalProxy) *KubeTerminal {
+ // Only enabled in tech preview
+ if !p.GetConfig().EnableTechPreview {
+ log.Info("Kube Terminal not enabled - requires tech preview")
+ return nil
+ }
+
+ kt := &KubeTerminal{
+ PortalProxy: p,
+ }
+ if err := config.Load(kt, p.Env().Lookup); err != nil {
+ log.Warnf("Unable to load Kube Terminal configuration. %v", err)
+ return nil
+ }
+
+ // Check that we have everything we need
+ if len(kt.Image) == 0 || len(kt.Namespace) == 0 {
+ log.Warn("Kube Terminal configuration is not complete")
+ return nil
+ }
+
+ // Read the Kubernetes API Endpoint
+ host, hostFound := p.Env().Lookup(serviceHostEnvVar)
+ port, portFound := p.Env().Lookup(servicePortEnvVar)
+ if !hostFound || !portFound {
+ log.Warn("Kubernetes API Server configuration not found (host and/or port env vars not set)")
+ return nil
+ }
+ kt.APIServer = fmt.Sprintf("https://%s:%s", host, port)
+
+ // Read the Service Account Token
+ token, err := ioutil.ReadFile(serviceAccountTokenFile)
+ if err != nil {
+ // Check env var
+ tkn, found := p.Env().Lookup(serviceTokenEnvVar)
+ if !found {
+ log.Warnf("Unable to load Service Account token. %v", err)
+ return nil
+ }
+ token = []byte(tkn)
+ }
+
+ kt.Token = token
+
+ log.Debug("Kubernetes Terminal configured")
+ return kt
+}
diff --git a/src/jetstream/repository/interfaces/sessiondata.go b/src/jetstream/repository/interfaces/sessiondata.go
index 4128f4280f..37884b576a 100644
--- a/src/jetstream/repository/interfaces/sessiondata.go
+++ b/src/jetstream/repository/interfaces/sessiondata.go
@@ -10,9 +10,13 @@ type SessionDataStore interface {
SetValues(session, group string, values map[string]string, autoExpire bool) error
DeleteValues(session, group string) error
+ IsValidSession(id int) (bool, error)
+
// Cleanup runs a background goroutine every interval that deletes expired sessions from the database
Cleanup(interval time.Duration) (chan<- struct{}, <-chan struct{})
// StopCleanup stops the background cleanup from running
StopCleanup(quit chan<- struct{}, done <-chan struct{})
+
+
}
diff --git a/src/jetstream/repository/sessiondata/psql_sessiondata.go b/src/jetstream/repository/sessiondata/psql_sessiondata.go
index d0c3e7c1a0..c60ad32b63 100644
--- a/src/jetstream/repository/sessiondata/psql_sessiondata.go
+++ b/src/jetstream/repository/sessiondata/psql_sessiondata.go
@@ -3,6 +3,8 @@ package sessiondata
import (
"database/sql"
"fmt"
+ "strconv"
+ "time"
log "github.com/sirupsen/logrus"
@@ -16,12 +18,15 @@ var insertSessionDataValue = `INSERT INTO session_data (session, groupName, name
var deleteSessionGroupData = `DELETE FROM session_data WHERE session=$1 AND groupName=$2`
-// Expire data for sessions that not longer exist
+// Expire data for sessions that no longer exist
var expireSessionData = `UPDATE session_data SET expired=true WHERE session NOT IN (SELECT id from sessions)`
// Delete data for sessions that no longer exist
var deleteSessionData = `DELETE FROM session_data WHERE expired=true AND keep_on_expire=false`
+// Check if a session valid
+var isValidSession = `SELECT id, expires_on from sessions WHERE id=$1`
+
// SessionDataRepository is a RDB-backed Session Data repository
type SessionDataRepository struct {
db *sql.DB
@@ -40,6 +45,7 @@ func InitRepositoryProvider(databaseProvider string) {
deleteSessionGroupData = datastore.ModifySQLStatement(deleteSessionGroupData, databaseProvider)
expireSessionData = datastore.ModifySQLStatement(expireSessionData, databaseProvider)
deleteSessionData = datastore.ModifySQLStatement(deleteSessionData, databaseProvider)
+ isValidSession = datastore.ModifySQLStatement(isValidSession, databaseProvider)
}
// GetValues returns all values from the config table as a map
@@ -100,3 +106,27 @@ func (c *SessionDataRepository) SetValues(session, group string, values map[stri
return nil
}
+
+// IsValidSession - Determines if the given session ID is still valid (has not expired)
+func (c *SessionDataRepository) IsValidSession(session int) (bool, error) {
+ var (
+ id string
+ expiry time.Time
+ )
+
+ err := c.db.QueryRow(isValidSession, strconv.Itoa(session)).Scan(&id, &expiry)
+
+ switch {
+ case err == sql.ErrNoRows:
+ // No record with this ID - session does not exist
+ return false, nil
+ case err != nil:
+ return false, fmt.Errorf("Error trying to find Session record: %v", err)
+ default:
+ // do nothing
+ }
+
+ // Check if the session has expired
+ now := time.Now()
+ return expiry.After(now), nil
+}