Skip to content

Commit

Permalink
Merge branch 'master' into getting_started_page_UI
Browse files Browse the repository at this point in the history
  • Loading branch information
Bobgy authored Feb 13, 2020
2 parents 061ce23 + 0623ac1 commit 7aa0d49
Show file tree
Hide file tree
Showing 76 changed files with 2,074 additions and 618 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ matrix:
python: "2.7"
env: TOXENV=py27
script:
- python -m pip
- pip install six==1.12.0
# Component SDK tests
- cd $TRAVIS_BUILD_DIR/components/gcp/container/component_sdk/python
- ./run_test.sh
Expand All @@ -57,7 +59,7 @@ matrix:
install: &0
- python3 -m pip install -r $TRAVIS_BUILD_DIR/sdk/python/requirements.txt
# Additional dependencies
- pip3 install coverage==4.5.4 coveralls==1.9.2
- pip3 install coverage==4.5.4 coveralls==1.9.2 six==1.12.0
# Sample test infra dependencies
- pip3 install minio
- pip3 install junit_xml
Expand Down
3 changes: 2 additions & 1 deletion backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ FROM python:3.5 as compiler
RUN apt-get update -y && \
apt-get install --no-install-recommends -y -q default-jdk python3-setuptools python3-dev
RUN wget https://bootstrap.pypa.io/get-pip.py && python3 get-pip.py
RUN python3 -m pip install tfx==0.21.0rc0
# pin avro==1.9.1 because installing just tfx brings avro 1.9.2, which breaks the build
RUN python3 -m pip install avro-python3==1.9.1 tfx==0.21.0rc0

WORKDIR /go/src/github.com/kubeflow/pipelines
COPY sdk sdk
Expand Down
4 changes: 2 additions & 2 deletions backend/api/job.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ service JobService {
};
}

//Enable a job.
//Restarts a job that was previously stopped. All runs associated with the job will continue.
rpc EnableJob(EnableJobRequest) returns (google.protobuf.Empty) {
option (google.api.http) = {
post: "/apis/v1beta1/jobs/{id}/enable"
};
}

//Disable a job.
//Stops a job and all its associated runs. The job is not deleted.
rpc DisableJob(DisableJobRequest) returns (google.protobuf.Empty) {
option (google.api.http) = {
post: "/apis/v1beta1/jobs/{id}/disable"
Expand Down
2 changes: 1 addition & 1 deletion backend/api/pipeline.proto
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ service PipelineService {
};
}

//Get a YAML template for the selected pipeline.
//Returns a single YAML template that contains the description, parameters, and metadata associated with the pipeline provided.
rpc GetTemplate(GetTemplateRequest) returns (GetTemplateResponse) {
option (google.api.http) = {
get: "/apis/v1beta1/pipelines/{id}/templates"
Expand Down
1 change: 1 addition & 0 deletions backend/src/apiserver/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"@com_github_go_sql_driver_mysql//:go_default_library",
"@com_github_golang_glog//:go_default_library",
"@com_github_minio_minio_go//:go_default_library",
"@com_github_minio_minio_go//pkg/credentials:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@io_k8s_api//core/v1:go_default_library",
"@io_k8s_api//policy/v1beta1:go_default_library",
Expand Down
33 changes: 28 additions & 5 deletions backend/src/apiserver/client/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,54 @@ package client

import (
"fmt"
"net/http"
"time"

"github.com/cenkalti/backoff"
"github.com/golang/glog"
minio "github.com/minio/minio-go"
credentials "github.com/minio/minio-go/pkg/credentials"
"github.com/pkg/errors"
)

// createCredentialProvidersChain creates a chained providers credential for a minio client
func createCredentialProvidersChain(endpoint, accessKey, secretKey string) *credentials.Credentials {
// first try with static api key
if accessKey != "" && secretKey != "" {
return credentials.NewStaticV4(accessKey, secretKey, "")
}
// otherwise use a chained provider: minioEnv -> awsEnv -> IAM
providers := []credentials.Provider{
&credentials.EnvMinio{},
&credentials.EnvAWS{},
&credentials.IAM{
Client: &http.Client{
Transport: http.DefaultTransport,
},
},
}
return credentials.New(&credentials.Chain{Providers: providers})
}

func CreateMinioClient(minioServiceHost string, minioServicePort string,
accessKey string, secretKey string) (*minio.Client, error) {
minioClient, err := minio.New(joinHostPort(minioServiceHost, minioServicePort),
accessKey, secretKey, false /* Secure connection */)
accessKey string, secretKey string, secure bool, region string) (*minio.Client, error) {

endpoint := joinHostPort(minioServiceHost, minioServicePort)
cred := createCredentialProvidersChain(endpoint, accessKey, secretKey)
minioClient, err := minio.NewWithCredentials(endpoint, cred, secure, region)
if err != nil {
return nil, errors.Wrapf(err, "Error while creating minio client: %+v", err)
}
return minioClient, nil
}

func CreateMinioClientOrFatal(minioServiceHost string, minioServicePort string,
accessKey string, secretKey string, initConnectionTimeout time.Duration) *minio.Client {
accessKey string, secretKey string, secure bool, region string, initConnectionTimeout time.Duration) *minio.Client {
var minioClient *minio.Client
var err error
var operation = func() error {
minioClient, err = CreateMinioClient(minioServiceHost, minioServicePort,
accessKey, secretKey)
accessKey, secretKey, secure, region)
if err != nil {
return err
}
Expand Down
39 changes: 24 additions & 15 deletions backend/src/apiserver/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ import (
const (
minioServiceHost = "MINIO_SERVICE_SERVICE_HOST"
minioServicePort = "MINIO_SERVICE_SERVICE_PORT"
minioServiceRegion = "MINIO_SERVICE_REGION"
minioServiceSecure = "MINIO_SERVICE_SECURE"
pipelineBucketName = "MINIO_PIPELINE_BUCKET_NAME"
pipelinePath = "MINIO_PIPELINE_PATH"
mysqlServiceHost = "DBConfig.Host"
mysqlServicePort = "DBConfig.Port"
mysqlUser = "DBConfig.User"
Expand Down Expand Up @@ -317,29 +321,34 @@ func initMinioClient(initConnectionTimeout time.Duration) storage.ObjectStoreInt
"ObjectStoreConfig.Host", os.Getenv(minioServiceHost))
minioServicePort := common.GetStringConfigWithDefault(
"ObjectStoreConfig.Port", os.Getenv(minioServicePort))
accessKey := common.GetStringConfig("ObjectStoreConfig.AccessKey")
secretKey := common.GetStringConfig("ObjectStoreConfig.SecretAccessKey")
bucketName := common.GetStringConfig("ObjectStoreConfig.BucketName")
minioServiceRegion := common.GetStringConfigWithDefault(
"ObjectStoreConfig.Region", os.Getenv(minioServiceRegion))
minioServiceSecure := common.GetBoolConfigWithDefault(
"ObjectStoreConfig.Secure", common.GetBoolFromStringWithDefault(os.Getenv(minioServiceSecure), false))
accessKey := common.GetStringConfigWithDefault("ObjectStoreConfig.AccessKey", "")
secretKey := common.GetStringConfigWithDefault("ObjectStoreConfig.SecretAccessKey", "")
bucketName := common.GetStringConfigWithDefault("ObjectStoreConfig.BucketName", os.Getenv(pipelineBucketName))
pipelinePath := common.GetStringConfigWithDefault("ObjectStoreConfig.PipelineFolder", os.Getenv(pipelinePath))
disableMultipart := common.GetBoolConfigWithDefault("ObjectStoreConfig.Multipart.Disable", true)

minioClient := client.CreateMinioClientOrFatal(minioServiceHost, minioServicePort, accessKey,
secretKey, initConnectionTimeout)
createMinioBucket(minioClient, bucketName)
secretKey, minioServiceSecure, minioServiceRegion, initConnectionTimeout)
createMinioBucket(minioClient, bucketName, minioServiceRegion)

return storage.NewMinioObjectStore(&storage.MinioClient{Client: minioClient}, bucketName, disableMultipart)
return storage.NewMinioObjectStore(&storage.MinioClient{Client: minioClient}, bucketName, pipelinePath, disableMultipart)
}

func createMinioBucket(minioClient *minio.Client, bucketName string) {
func createMinioBucket(minioClient *minio.Client, bucketName, region string) {
// Check to see if we already own this bucket.
exists, err := minioClient.BucketExists(bucketName)
if exists {
glog.Infof("We already own %s\n", bucketName)
return
}
// Create bucket if it does not exist
err := minioClient.MakeBucket(bucketName, "")
err = minioClient.MakeBucket(bucketName, region)
if err != nil {
// Check to see if we already own this bucket.
exists, err := minioClient.BucketExists(bucketName)
if err == nil && exists {
glog.Infof("We already own %s\n", bucketName)
} else {
glog.Fatalf("Failed to create Minio bucket. Error: %v", err)
}
glog.Fatalf("Failed to create Minio bucket. Error: %v", err)
}
glog.Infof("Successfully created bucket %s\n", bucketName)
}
Expand Down
8 changes: 8 additions & 0 deletions backend/src/apiserver/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,11 @@ func IsMultiUserMode() bool {
func GetPodNamespace() string {
return GetStringConfig(PodNamespace)
}

func GetBoolFromStringWithDefault(value string, defaultValue bool) bool {
boolVal, err := strconv.ParseBool(value)
if err != nil {
return defaultValue
}
return boolVal
}
5 changes: 3 additions & 2 deletions backend/src/apiserver/config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
"DBName": "mlpipeline",
"GroupConcatMaxLen": "4194304"
},
"ObjectStoreConfig":{
"ObjectStoreConfig": {
"AccessKey": "minio",
"SecretAccessKey": "minio123",
"BucketName": "mlpipeline"
"BucketName": "mlpipeline",
"PipelineFolder": "pipelines"
},
"InitConnectionTimeout": "6m",
"DefaultPipelineRunnerServiceAccount": "pipeline-runner"
Expand Down
8 changes: 4 additions & 4 deletions backend/src/apiserver/config/sample_config.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[
{
"name": "[Demo] ML - XGBoost - Training with Confusion Matrix",
"description": "[GCP Permission requirements](https://github.com/kubeflow/pipelines/blob/master/samples/core/xgboost_training_cm#requirements). [source code](https://github.com/kubeflow/pipelines/blob/master/samples/core/xgboost_training_cm). A trainer that does end-to-end distributed training for XGBoost models.",
"name": "[Demo] XGBoost - Training with Confusion Matrix",
"description": "[source code](https://github.com/kubeflow/pipelines/blob/master/samples/core/xgboost_training_cm). A trainer that does end-to-end distributed training for XGBoost models. More [details](https://github.com/kubeflow/pipelines/blob/master/samples/core/xgboost_training_cm#requirements). ",
"file": "/samples/core/xgboost_training_cm/xgboost_training_cm.py.yaml"
},
{
"name": "[Demo] TFX pipeline - Taxi Tip Prediction Model Trainer",
"description": "[GCP Permission requirements](https://github.com/kubeflow/pipelines/blob/master/samples/core/parameterized_tfx_oss#permission). [source code](https://console.cloud.google.com/mlengine/notebooks/deploy-notebook?q=download_url%3Dhttps%253A%252F%252Fraw.githubusercontent.com%252Fkubeflow%252Fpipelines%252Fmaster%252Fsamples%252Fcore%252Fparameterized_tfx_oss%252Ftaxi_pipeline_notebook.ipynb). Example pipeline that does classification with model analysis based on a public tax cab dataset.",
"name": "[Demo] TFX - Taxi Tip Prediction Model Trainer",
"description": "[source code](https://console.cloud.google.com/mlengine/notebooks/deploy-notebook?q=download_url%3Dhttps%253A%252F%252Fraw.githubusercontent.com%252Fkubeflow%252Fpipelines%252Fmaster%252Fsamples%252Fcore%252Fparameterized_tfx_oss%252Ftaxi_pipeline_notebook.ipynb). Example pipeline that does classification with model analysis based on a public tax cab dataset. More [details](https://github.com/kubeflow/pipelines/blob/master/samples/core/parameterized_tfx_oss). ",
"file": "/samples/core/parameterized_tfx_oss/parameterized_tfx_oss.py.yaml"
},
{
Expand Down
18 changes: 12 additions & 6 deletions backend/src/apiserver/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,24 @@ func New(filterProto *api.Filter) (*Filter, error) {
return f, nil
}

// NewWithKeyMap is like New, but takes an additional map for mapping key names
// NewWithKeyMap is like New, but takes an additional map and model name for mapping key names
// in the protocol buffer to an appropriate name for use when querying the
// model. For example, if the API name of a field is "foo" and the equivalent
// model name is "ModelFoo", then filterProto with predicates against key "foo"
// will be parsed as if the key value was "ModelFoo".
func NewWithKeyMap(filterProto *api.Filter, keyMap map[string]string) (*Filter, error) {
// model. For example, if the API name of a field is "name", the model name is "pipelines", and
// the equivalent column name is "Name", then filterProto with predicates against key "name"
// will be parsed as if the key value was "pipelines.Name".
func NewWithKeyMap(filterProto *api.Filter, keyMap map[string]string, modelName string) (*Filter, error) {
// Fully qualify column name to avoid "ambiguous column name" error.
var modelNamePrefix string
if modelName != "" {
modelNamePrefix = modelName + "."
}

for _, pred := range filterProto.Predicates {
k, ok := keyMap[pred.Key]
if !ok {
return nil, util.NewInvalidInputError("no support for filtering on unrecognized field %q", pred.Key)
}
pred.Key = k
pred.Key = modelNamePrefix + k
}
return New(filterProto)
}
Expand Down
55 changes: 55 additions & 0 deletions backend/src/apiserver/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,61 @@ func TestValidNewFilters(t *testing.T) {
}
}

func TestValidNewFiltersWithKeyMap(t *testing.T) {
opts := []cmp.Option{
cmp.AllowUnexported(Filter{}),
cmp.FilterPath(func(p cmp.Path) bool {
return p.String() == "filterProto"
}, cmp.Ignore()),
cmpopts.EquateEmpty(),
}

tests := []struct {
protoStr string
want *Filter
}{
{
`predicates { key: "name" op: EQUALS string_value: "pipeline" }`,
&Filter{eq: map[string]interface{}{"pipelines.Name": "pipeline"}},
},
{
`predicates { key: "name" op: NOT_EQUALS string_value: "pipeline" }`,
&Filter{neq: map[string]interface{}{"pipelines.Name": "pipeline"}},
},
{
`predicates {
key: "name" op: IN
string_values { values: 'pipeline_1' values: 'pipeline_2' } }`,
&Filter{in: map[string]interface{}{"pipelines.Name": []string{"pipeline_1", "pipeline_2"}}},
},
{
`predicates {
key: "name" op: IS_SUBSTRING string_value: "pipeline" }`,
&Filter{substring: map[string]interface{}{"pipelines.Name": "pipeline"}},
},
}

for _, test := range tests {
filterProto := &api.Filter{}
if err := proto.UnmarshalText(test.protoStr, filterProto); err != nil {
t.Errorf("Failed to unmarshal Filter text proto\n%q\nError: %v", test.protoStr, err)
continue
}

keyMap := map[string]string{
"id": "UUID",
"name": "Name",
"created_at": "CreatedAtInSec",
"description": "Description",
}
modelName := "pipelines"
got, err := NewWithKeyMap(filterProto, keyMap, modelName)
if !cmp.Equal(got, test.want, opts...) || err != nil {
t.Errorf("New(%+v) = %+v, %v\nWant %+v, nil", *filterProto, got, err, test.want)
}
}
}

func TestInvalidFilters(t *testing.T) {
tests := []struct {
protoStr string
Expand Down
2 changes: 1 addition & 1 deletion backend/src/apiserver/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func NewOptions(listable Listable, pageSize int, sortBy string, filterProto *api

// Filtering.
if filterProto != nil {
f, err := filter.NewWithKeyMap(filterProto, listable.APIToModelFieldMap())
f, err := filter.NewWithKeyMap(filterProto, listable.APIToModelFieldMap(), listable.GetModelName())
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 7aa0d49

Please sign in to comment.