Skip to content

Commit

Permalink
feat(v2): enable v2 compatible mode in full Kubeflow with zero config.
Browse files Browse the repository at this point in the history
…Fixes #5680 (#5697)

* feat(v2): enable v2 compatible mode in full Kubeflow

* fix

* fix

* move objectstore reelated code to separate package

* fix style
  • Loading branch information
Bobgy authored Jun 2, 2021
1 parent 601b104 commit 43994ab
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 26 deletions.
2 changes: 1 addition & 1 deletion v2/cmd/launch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var (
pipelineName = flag.String("pipeline_name", "", "The current pipeline name.")
pipelineRunID = flag.String("pipeline_run_id", "", "The current pipeline run ID.")
pipelineTaskID = flag.String("pipeline_task_id", "", "The current pipeline task ID.")
pipelineRoot = flag.String("pipeline_root", "minio://mlpipeline/v2/artifacts", "The root output directory in which to store output artifacts.")
pipelineRoot = flag.String("pipeline_root", "", "The root output directory in which to store output artifacts.")
)

func main() {
Expand Down
72 changes: 47 additions & 25 deletions v2/component/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import (

"github.com/golang/glog"
"github.com/kubeflow/pipelines/v2/metadata"
"github.com/kubeflow/pipelines/v2/objectstore"
"github.com/kubeflow/pipelines/v2/third_party/pipeline_spec"
"google.golang.org/protobuf/encoding/protojson"
v1 "k8s.io/api/core/v1"
k8errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -55,6 +57,8 @@ type Launcher struct {
placeholderReplacements map[string]string
metadataClient *metadata.Client
bucketConfig *bucketConfig
k8sClient *kubernetes.Clientset
namespace string
}

// LauncherOptions are options used when creating Launcher.
Expand Down Expand Up @@ -166,12 +170,40 @@ func (o *LauncherOptions) validate() error {

const outputMetadataFilepath = "/tmp/kfp_outputs/output_metadata.json"
const defaultPipelineRoot = "minio://mlpipeline/v2/artifacts"
const launcherConfigName = "kfp-launcher"
const configKeyDefaultPipelineRoot = "defaultPipelineRoot"

// NewLauncher creates a new launcher object using the JSON-encoded runtimeInfo
// and specified options.
func NewLauncher(runtimeInfo string, options *LauncherOptions) (*Launcher, error) {
restConfig, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("Failed to initialize kubernetes client: %w", err)
}
k8sClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("Failed to initialize kubernetes client set: %w", err)
}
namespace := os.Getenv("KFP_NAMESPACE")
if namespace == "" {
return nil, fmt.Errorf("Env variable 'KFP_NAMESPACE' is empty")
}

if len(options.PipelineRoot) == 0 {
options.PipelineRoot = defaultPipelineRoot
config, err := getLauncherConfig(k8sClient, namespace)
if err != nil {
return nil, err
}
// Launcher config is optional, so it can be nil when err == nil.
if config != nil {
// The key defaultPipelineRoot is also optional in launcher config.
defaultRootFromConfig := config.Data[configKeyDefaultPipelineRoot]
if defaultRootFromConfig != "" {
options.PipelineRoot = defaultRootFromConfig
}
}
glog.Infof("PipelineRoot defaults to %q.", options.PipelineRoot)
}
if err := options.validate(); err != nil {
return nil, err
Expand Down Expand Up @@ -201,6 +233,8 @@ func NewLauncher(runtimeInfo string, options *LauncherOptions) (*Launcher, error
runtimeInfo: rt,
metadataClient: metadataClient,
bucketConfig: bc,
k8sClient: k8sClient,
namespace: namespace,
}, nil
}

Expand Down Expand Up @@ -275,7 +309,7 @@ func (l *Launcher) RunComponent(ctx context.Context, cmd string, args ...string)

execution, err := l.metadataClient.CreateExecution(ctx, pipeline, l.options.TaskName, l.options.PipelineTaskID, l.options.ContainerImage, ecfg)
if err != nil {
fmt.Errorf("unable to create execution: %w", err)
return fmt.Errorf("unable to create execution: %w", err)
}

executor := exec.Command(cmd, args...)
Expand Down Expand Up @@ -733,22 +767,14 @@ func getExecutorOutput() (*pipeline_spec.ExecutorOutput, error) {

func (l *Launcher) openBucket() (*blob.Bucket, error) {
if l.bucketConfig.scheme == "minio://" {
secret, err := getMinioCredential()
cred, err := objectstore.GetMinioCredential(l.k8sClient, l.namespace)
if err != nil {
return nil, fmt.Errorf("Failed to get minio credential: %w", err)
}
accessKey := string(secret.Data["accesskey"])
secretKey := string(secret.Data["secretkey"])
if accessKey == "" {
return nil, fmt.Errorf("The secret which stores minio credential does not have 'accesskey' entry")
}
if secretKey == "" {
return nil, fmt.Errorf("The secret which stores minio credential does not have 'secretkey' entry")
}
sess, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""),
Credentials: credentials.NewStaticCredentials(cred.AccessKey, cred.SecretKey, ""),
Region: aws.String("minio"),
Endpoint: aws.String("minio-service:9000"),
Endpoint: aws.String(objectstore.MinioDefaultEndpoint()),
DisableSSL: aws.Bool(true),
S3ForcePathStyle: aws.Bool(true),
})
Expand All @@ -761,19 +787,15 @@ func (l *Launcher) openBucket() (*blob.Bucket, error) {
return blob.OpenBucket(context.Background(), l.bucketConfig.bucketURL())
}

func getMinioCredential() (*v1.Secret, error) {
restConfig, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("Failed to initialize kubernetes client: %w", err)
}
clientSet, err := kubernetes.NewForConfig(restConfig)
func getLauncherConfig(clientSet *kubernetes.Clientset, namespace string) (*v1.ConfigMap, error) {
config, err := clientSet.CoreV1().ConfigMaps(namespace).Get(context.Background(), launcherConfigName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("Failed to initialize kubernetes client set: %w", err)
}
namespace := os.Getenv("KFP_NAMESPACE")
if namespace == "" {
return nil, fmt.Errorf("Env variable 'KFP_NAMESPACE' is empty")
if k8errors.IsNotFound(err) {
glog.Infof("cannot find launcher configmap: name=%q namespace=%q", launcherConfigName, namespace)
// LauncherConfig is optional, so ignore not found error.
return nil, nil
}
return nil, err
}
secret, err := clientSet.CoreV1().Secrets(namespace).Get(context.Background(), "mlpipeline-minio-artifact", metav1.GetOptions{})
return secret, err
return config, nil
}
78 changes: 78 additions & 0 deletions v2/objectstore/object_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2021 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This package contains helper methods for using object stores.
// TODO: move other object store related methods here.
package objectstore

import (
"context"
"fmt"
"os"

"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

// The endpoint uses Kubernetes service DNS name with namespace:
// https://kubernetes.io/docs/concepts/services-networking/service/#dns
const defaultMinioEndpointInMultiUserMode = "minio-service.kubeflow:9000"
const minioArtifactSecretName = "mlpipeline-minio-artifact"

func MinioDefaultEndpoint() string {
// Discover minio-service in the same namespace by env var.
// https://kubernetes.io/docs/concepts/services-networking/service/#environment-variables
minioHost := os.Getenv("MINIO_SERVICE_SERVICE_HOST")
minioPort := os.Getenv("MINIO_SERVICE_SERVICE_PORT")
if minioHost != "" && minioPort != "" {
// If there is a minio-service Kubernetes service in the same namespace,
// MINIO_SERVICE_SERVICE_HOST and MINIO_SERVICE_SERVICE_PORT env vars should
// exist by default, so we use it as default.
return minioHost + ":" + minioPort
}
// If the env vars do not exist, we guess that we are running in KFP multi user mode, so default minio service should be `minio-service.kubeflow:9000`.
glog.Infof("Cannot detect minio-service in the same namespace, default to %s as MinIO endpoint.", defaultMinioEndpointInMultiUserMode)
return defaultMinioEndpointInMultiUserMode
}

type MinioCredential struct {
AccessKey string
SecretKey string
}

func GetMinioCredential(clientSet *kubernetes.Clientset, namespace string) (cred MinioCredential, err error) {
defer func() {
if err != nil {
// wrap error before returning
err = fmt.Errorf("Failed to get MinIO credential from secret name=%q namespace=%q: %w", minioArtifactSecretName, namespace, err)
}
}()
secret, err := clientSet.CoreV1().Secrets(namespace).Get(
context.Background(),
minioArtifactSecretName,
metav1.GetOptions{})
if err != nil {
return cred, err
}
cred.AccessKey = string(secret.Data["accesskey"])
cred.SecretKey = string(secret.Data["secretkey"])
if cred.AccessKey == "" {
return cred, fmt.Errorf("does not have 'accesskey' key")
}
if cred.SecretKey == "" {
return cred, fmt.Errorf("does not have 'secretkey' key")
}
return cred, nil
}
69 changes: 69 additions & 0 deletions v2/objectstore/object_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2021 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package objectstore_test

import (
"os"
"testing"

"github.com/kubeflow/pipelines/v2/objectstore"
)

func Test_GetMinioDefaultEndpoint(t *testing.T) {
defer func() {
os.Unsetenv("MINIO_SERVICE_SERVICE_HOST")
os.Unsetenv("MINIO_SERVICE_SERVICE_PORT")
}()
tests := []struct {
name string
minioServiceHostEnv string
minioServicePortEnv string
want string
}{
{
name: "In full Kubeflow, KFP multi-user mode on",
minioServiceHostEnv: "",
minioServicePortEnv: "",
want: "minio-service.kubeflow:9000",
},
{
name: "In KFP standalone without multi-user mode",
minioServiceHostEnv: "1.2.3.4",
minioServicePortEnv: "4321",
want: "1.2.3.4:4321",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.minioServiceHostEnv != "" {
os.Setenv("MINIO_SERVICE_SERVICE_HOST", tt.minioServiceHostEnv)
} else {
os.Unsetenv("MINIO_SERVICE_SERVICE_HOST")
}
if tt.minioServicePortEnv != "" {
os.Setenv("MINIO_SERVICE_SERVICE_PORT", tt.minioServicePortEnv)
} else {
os.Unsetenv("MINIO_SERVICE_SERVICE_PORT")
}
got := objectstore.MinioDefaultEndpoint()
if got != tt.want {
t.Errorf(
"MinioDefaultEndpoint() = %q, want %q\nwhen MINIO_SERVICE_SERVICE_HOST=%q MINIO_SERVICE_SERVICE_PORT=%q",
got, tt.want, tt.minioServiceHostEnv, tt.minioServicePortEnv,
)
}
})
}
}

0 comments on commit 43994ab

Please sign in to comment.