Skip to content

Commit

Permalink
Add visualization-server service to lightweight deployment (#1844)
Browse files Browse the repository at this point in the history
* Add visualization-server service to lightweight deployment

* Addressed PR suggestions

* Added field to determine if visualization service is active and fixed unit tests for visualization_server.go

* Additional small fixes

* port change from 88888 -> 8888
* version change from 0.1.15 -> 0.1.26
* removed visualization-server from base/kustomization.yaml

* Fixed visualization_server_test.go to reflect new changes

* Changed implementation to be fail fast

* Changed host name to be constant provided by environment

* Added retry and extracted isVisualizationServiceAlive logic to function

* Fixed deployment.yaml file

* Fixed serviceURL configuration issuse

serviceURL is now properly obtained from the environment, the service ip address and port are used rather than service name and namespace

* Added log message to indicate when visualization service is unreachable

* Addressed PR comments

* Removed _HTTP
  • Loading branch information
ajchili authored and k8s-ci-robot committed Aug 22, 2019
1 parent ad307db commit 8c3d6fe
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 12 deletions.
3 changes: 3 additions & 0 deletions backend/src/apiserver/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ const (

podNamespace = "POD_NAMESPACE"
initConnectionTimeout = "InitConnectionTimeout"

visualizationServiceHost = "ML_PIPELINE_VISUALIZATIONSERVER_SERVICE_HOST"
visualizationServicePort = "ML_PIPELINE_VISUALIZATIONSERVER_SERVICE_PORT"
)

// Container for all service clients
Expand Down
9 changes: 8 additions & 1 deletion backend/src/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,14 @@ func startRpcServer(resourceManager *resource.ResourceManager) {
api.RegisterRunServiceServer(s, server.NewRunServer(resourceManager))
api.RegisterJobServiceServer(s, server.NewJobServer(resourceManager))
api.RegisterReportServiceServer(s, server.NewReportServer(resourceManager))
api.RegisterVisualizationServiceServer(s, server.NewVisualizationServer(resourceManager))
api.RegisterVisualizationServiceServer(
s,
server.NewVisualizationServer(
resourceManager,
getStringConfig(visualizationServiceHost),
getStringConfig(visualizationServicePort),
getDurationConfig(initConnectionTimeout),
))

// Register reflection service on gRPC server.
reflection.Register(s)
Expand Down
1 change: 1 addition & 0 deletions backend/src/apiserver/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//backend/src/common/util:go_default_library",
"//backend/src/crd/pkg/apis/scheduledworkflow/v1beta1:go_default_library",
"@com_github_argoproj_argo//pkg/apis/workflow/v1alpha1:go_default_library",
"@com_github_cenkalti_backoff//:go_default_library",
"@com_github_golang_glog//:go_default_library",
"@com_github_golang_protobuf//jsonpb:go_default_library_gen",
"@com_github_robfig_cron//:go_default_library",
Expand Down
45 changes: 41 additions & 4 deletions backend/src/apiserver/server/visualization_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/cenkalti/backoff"
"github.com/golang/glog"
"github.com/kubeflow/pipelines/backend/api/go_client"
"github.com/kubeflow/pipelines/backend/src/apiserver/resource"
"github.com/kubeflow/pipelines/backend/src/common/util"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
)

type VisualizationServer struct {
resourceManager *resource.ResourceManager
serviceURL string
resourceManager *resource.ResourceManager
serviceURL string
isServiceAvailable bool
}

func (s *VisualizationServer) CreateVisualization(ctx context.Context, request *go_client.CreateVisualizationRequest) (*go_client.Visualization, error) {
Expand Down Expand Up @@ -56,6 +60,12 @@ func (s *VisualizationServer) validateCreateVisualizationRequest(request *go_cli
// service to generate HTML visualizations from a request.
// It returns the generated HTML as a string and any error that is encountered.
func (s *VisualizationServer) generateVisualizationFromRequest(request *go_client.CreateVisualizationRequest) ([]byte, error) {
if !s.isServiceAvailable {
return nil, util.NewInternalServerError(
fmt.Errorf("service not available"),
"Service not available",
)
}
visualizationType := strings.ToLower(go_client.Visualization_Type_name[int32(request.Visualization.Type)])
arguments := fmt.Sprintf("--type %s --source %s --arguments '%s'", visualizationType, request.Visualization.Source, request.Visualization.Arguments)
resp, err := http.PostForm(s.serviceURL, url.Values{"arguments": {arguments}})
Expand All @@ -73,6 +83,33 @@ func (s *VisualizationServer) generateVisualizationFromRequest(request *go_clien
return body, nil
}

func NewVisualizationServer(resourceManager *resource.ResourceManager) *VisualizationServer {
return &VisualizationServer{resourceManager: resourceManager, serviceURL: "http://visualization-service.kubeflow"}
func isVisualizationServiceAlive(serviceURL string, initConnectionTimeout time.Duration) bool {
var operation = func() error {
_, err := http.Get(serviceURL)
if err != nil {
glog.Error("Unable to verify visualization service is alive!", err)
return err
}
return nil
}
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = initConnectionTimeout
err := backoff.Retry(operation, b)
return err == nil
}

func NewVisualizationServer(resourceManager *resource.ResourceManager, serviceHost string, servicePort string, initConnectionTimeout time.Duration) *VisualizationServer {
serviceURL := fmt.Sprintf("http://%s:%s", serviceHost, servicePort)
isServiceAvailable := isVisualizationServiceAlive(serviceURL, initConnectionTimeout)
return &VisualizationServer{
resourceManager: resourceManager,
serviceURL: serviceURL,
// TODO: isServiceAvailable is used to determine if the new visualization
// service is alive. If this is true, then the service is alive and
// requests can be made to it. Otherwise, if it is false, the service is
// not alive and requests should not be made. This prevents timeouts and
// counteracts current instabilities with the service. This should be
// removed after the visualization service is deemed stable.
isServiceAvailable: isServiceAvailable,
}
}
60 changes: 53 additions & 7 deletions backend/src/apiserver/server/visualization_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import (
func TestValidateCreateVisualizationRequest(t *testing.T) {
clients, manager, _ := initWithExperiment(t)
defer clients.Close()
server := NewVisualizationServer(manager)
server := &VisualizationServer{
resourceManager: manager,
isServiceAvailable: false,
}
visualization := &go_client.Visualization{
Type: go_client.Visualization_ROC_CURVE,
Source: "gs://ml-pipeline/roc/data.csv",
Expand All @@ -27,7 +30,10 @@ func TestValidateCreateVisualizationRequest(t *testing.T) {
func TestValidateCreateVisualizationRequest_ArgumentsAreEmpty(t *testing.T) {
clients, manager, _ := initWithExperiment(t)
defer clients.Close()
server := NewVisualizationServer(manager)
server := &VisualizationServer{
resourceManager: manager,
isServiceAvailable: false,
}
visualization := &go_client.Visualization{
Type: go_client.Visualization_ROC_CURVE,
Source: "gs://ml-pipeline/roc/data.csv",
Expand All @@ -43,7 +49,10 @@ func TestValidateCreateVisualizationRequest_ArgumentsAreEmpty(t *testing.T) {
func TestValidateCreateVisualizationRequest_SourceIsEmpty(t *testing.T) {
clients, manager, _ := initWithExperiment(t)
defer clients.Close()
server := NewVisualizationServer(manager)
server := &VisualizationServer{
resourceManager: manager,
isServiceAvailable: false,
}
visualization := &go_client.Visualization{
Type: go_client.Visualization_ROC_CURVE,
Source: "",
Expand All @@ -59,7 +68,10 @@ func TestValidateCreateVisualizationRequest_SourceIsEmpty(t *testing.T) {
func TestValidateCreateVisualizationRequest_ArgumentsNotValidJSON(t *testing.T) {
clients, manager, _ := initWithExperiment(t)
defer clients.Close()
server := NewVisualizationServer(manager)
server := &VisualizationServer{
resourceManager: manager,
isServiceAvailable: false,
}
visualization := &go_client.Visualization{
Type: go_client.Visualization_ROC_CURVE,
Source: "gs://ml-pipeline/roc/data.csv",
Expand All @@ -80,7 +92,11 @@ func TestGenerateVisualization(t *testing.T) {
rw.Write([]byte("roc_curve"))
}))
defer httpServer.Close()
server := &VisualizationServer{resourceManager: manager, serviceURL: httpServer.URL}
server := &VisualizationServer{
resourceManager: manager,
serviceURL: httpServer.URL,
isServiceAvailable: true,
}
visualization := &go_client.Visualization{
Type: go_client.Visualization_ROC_CURVE,
Source: "gs://ml-pipeline/roc/data.csv",
Expand All @@ -90,8 +106,34 @@ func TestGenerateVisualization(t *testing.T) {
Visualization: visualization,
}
body, err := server.generateVisualizationFromRequest(request)
assert.Equal(t, []byte("roc_curve"), body)
assert.Nil(t, err)
assert.Equal(t, []byte("roc_curve"), body)
}

func TestGenerateVisualization_ServiceNotAvailableError(t *testing.T) {
clients, manager, _ := initWithExperiment(t)
defer clients.Close()
httpServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, "/", req.URL.String())
rw.WriteHeader(500)
}))
defer httpServer.Close()
server := &VisualizationServer{
resourceManager: manager,
serviceURL: httpServer.URL,
isServiceAvailable: false,
}
visualization := &go_client.Visualization{
Type: go_client.Visualization_ROC_CURVE,
Source: "gs://ml-pipeline/roc/data.csv",
Arguments: "{}",
}
request := &go_client.CreateVisualizationRequest{
Visualization: visualization,
}
body, err := server.generateVisualizationFromRequest(request)
assert.Nil(t, body)
assert.Equal(t, "InternalServerError: Service not available: service not available", err.Error())
}

func TestGenerateVisualization_ServerError(t *testing.T) {
Expand All @@ -102,7 +144,11 @@ func TestGenerateVisualization_ServerError(t *testing.T) {
rw.WriteHeader(500)
}))
defer httpServer.Close()
server := &VisualizationServer{resourceManager: manager, serviceURL: httpServer.URL}
server := &VisualizationServer{
resourceManager: manager,
serviceURL: httpServer.URL,
isServiceAvailable: true,
}
visualization := &go_client.Visualization{
Type: go_client.Visualization_ROC_CURVE,
Source: "gs://ml-pipeline/roc/data.csv",
Expand Down
2 changes: 2 additions & 0 deletions manifests/kustomize/base/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ images:
newTag: 0.1.26
- name: gcr.io/ml-pipeline/inverse-proxy-agent
newTag: 0.1.26
- name: gcr.io/ml-pipeline/visualization-server
newTag: 0.1.26
2 changes: 2 additions & 0 deletions manifests/kustomize/base/pipeline/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ resources:
- ml-pipeline-viewer-crd-rolebinding.yaml
- ml-pipeline-viewer-crd-deployment.yaml
- ml-pipeline-viewer-crd-sa.yaml
- ml-pipeline-visualization-deployment.yaml
- ml-pipeline-visualization-service.yaml
- pipeline-runner-role.yaml
- pipeline-runner-rolebinding.yaml
- pipeline-runner-sa.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apiVersion: apps/v1beta2
kind: Deployment
metadata:
labels:
app: ml-pipeline-visualizationserver
name: ml-pipeline-visualizationserver
spec:
selector:
matchLabels:
app: ml-pipeline-visualizationserver
template:
metadata:
labels:
app: ml-pipeline-visualizationserver
spec:
containers:
- image: gcr.io/ml-pipeline/visualization-server:0.1.26
imagePullPolicy: IfNotPresent
name: ml-pipeline-visualizationserver
ports:
- containerPort: 8888
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: v1
kind: Service
metadata:
name: ml-pipeline-visualizationserver
spec:
ports:
- name: http
port: 8888
protocol: TCP
targetPort: 8888
selector:
app: ml-pipeline-visualizationserver

0 comments on commit 8c3d6fe

Please sign in to comment.