From 0282a33922185a421b1d461c00d4565b81e8e888 Mon Sep 17 00:00:00 2001 From: Ajay Gopinathan Date: Mon, 11 Mar 2019 16:53:11 -0700 Subject: [PATCH 1/2] Add fake metadata store and fix tests. Also, add instructions on how to build/run the backend with Bazel. Note that the fake metadata store works, but I need to add proper tests that exercise it. That'll be done in a separate PR. One thing I'm missing here is how to make Bazel run well in Travis. I will send a follow up PR for doing this. --- backend/README.md | 13 +++++++++-- backend/src/apiserver/resource/BUILD.bazel | 3 +++ .../apiserver/resource/client_manager_fake.go | 18 ++++++++++++++- backend/src/apiserver/storage/run_store.go | 23 ++++++++++++------- 4 files changed, 46 insertions(+), 11 deletions(-) diff --git a/backend/README.md b/backend/README.md index 81c7aaad9c9..a10c0dd00b4 100644 --- a/backend/README.md +++ b/backend/README.md @@ -5,14 +5,23 @@ Pipelines backend. All components can be built using [Bazel](https://bazel.build/). To build everything under backend, run: ``` -bazel build //backend/... +bazel build --action_env=PATH --define=grpc_no_ares=true //backend/... ``` To run all tests: ``` -bazel test //backend/... +bazel test --action_env=PATH --define=grpc_no_ares=true //backend/... ``` +The API server itself can only be built/tested using Bazel. The following commands target building and testing just the API server. +``` +bazel build --action_env=PATH --define=grpc_no_ares=true backend/src/apiserver/... +``` +``` +bazel test --action_env=PATH --define=grpc_no_ares=true backend/src/apiserver/... +``` + + ## Building Go client library and swagger files After making changes to proto files, the Go client libraries and swagger files need to be regenerated and checked-in. The backend/api/generate_api.sh diff --git a/backend/src/apiserver/resource/BUILD.bazel b/backend/src/apiserver/resource/BUILD.bazel index b9338213a70..f9d4999d153 100644 --- a/backend/src/apiserver/resource/BUILD.bazel +++ b/backend/src/apiserver/resource/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//backend/api:go_default_library", "//backend/src/apiserver/common:go_default_library", "//backend/src/apiserver/list:go_default_library", + "//backend/src/apiserver/metadata:go_default_library", "//backend/src/apiserver/model:go_default_library", "//backend/src/apiserver/storage:go_default_library", "//backend/src/common/util:go_default_library", @@ -25,6 +26,8 @@ go_library( "@com_github_argoproj_argo//pkg/client/clientset/versioned/typed/workflow/v1alpha1:go_default_library", "@com_github_golang_glog//:go_default_library", "@com_github_pkg_errors//:go_default_library", + "@google_ml_metadata//ml_metadata/metadata_store:metadata_store_go", + "@google_ml_metadata//ml_metadata/proto:metadata_store_go_proto", "@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library", "@io_k8s_apimachinery//pkg/types:go_default_library", "@io_k8s_apimachinery//pkg/watch:go_default_library", diff --git a/backend/src/apiserver/resource/client_manager_fake.go b/backend/src/apiserver/resource/client_manager_fake.go index 71a9ce8468e..1e92861bef0 100644 --- a/backend/src/apiserver/resource/client_manager_fake.go +++ b/backend/src/apiserver/resource/client_manager_fake.go @@ -15,8 +15,12 @@ package resource import ( + "ml_metadata/metadata_store/mlmetadata" + mlpb "ml_metadata/proto/metadata_store_go_proto" + workflowclient "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1" "github.com/golang/glog" + "github.com/kubeflow/pipelines/backend/src/apiserver/metadata" "github.com/kubeflow/pipelines/backend/src/apiserver/storage" "github.com/kubeflow/pipelines/backend/src/common/util" scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1" @@ -64,7 +68,7 @@ func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterf experimentStore: storage.NewExperimentStore(db, time, uuid), pipelineStore: storage.NewPipelineStore(db, time, uuid), jobStore: storage.NewJobStore(db, time), - runStore: storage.NewRunStore(db, time, nil), + runStore: storage.NewRunStore(db, time, initFakeMetadataStore()), workflowClientFake: NewWorkflowClientFake(), resourceReferenceStore: storage.NewResourceReferenceStore(db), dBStatusStore: storage.NewDBStatusStore(db), @@ -75,6 +79,18 @@ func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterf }, nil } +func initFakeMetadataStore() *metadata.Store { + cfg := &mlpb.ConnectionConfig{ + Config: &mlpb.ConnectionConfig_FakeDatabase{&mlpb.FakeDatabaseConfig{}}, + } + + mlmdStore, err := mlmetadata.NewStore(cfg) + if err != nil { + glog.Fatalf("Failed to create ML Metadata store: %v", err) + } + return metadata.NewStore(mlmdStore) +} + func NewFakeClientManagerOrFatal(time util.TimeInterface) *FakeClientManager { uuid := util.NewFakeUUIDGeneratorOrFatal(DefaultFakeUUID, nil) fakeStore, err := NewFakeClientManager(time, uuid) diff --git a/backend/src/apiserver/storage/run_store.go b/backend/src/apiserver/storage/run_store.go index 48e4aa1a207..60cbeea0266 100644 --- a/backend/src/apiserver/storage/run_store.go +++ b/backend/src/apiserver/storage/run_store.go @@ -365,19 +365,26 @@ func (s *RunStore) UpdateRun(runID string, condition string, workflowRuntimeMani // new in the status of an Argo manifest. This means we need to keep track // manually here on what the previously updated state of the run is, to ensure // we do not add duplicate metadata. Hence the locking below. - row := tx.QueryRow("SELECT WorkflowRuntimeManifest FROM run_details WHERE UUID = ? FOR UPDATE", runID) + var query string + switch x := s.db.SQLDialect.(type) { + case MySQLDialect: + query = "SELECT WorkflowRuntimeManifest FROM run_details WHERE UUID = ? FOR UPDATE" + case SQLiteDialect: + query = "SELECT WorkflowRuntimeManifest FROM run_details WHERE UUID = ?" + default: + glog.Fatal("Unsupported SQL dialect: %v", x) + } + row := tx.QueryRow(query, runID) var storedManifest string if err := row.Scan(&storedManifest); err != nil { tx.Rollback() - return util.NewInternalServerError(err, "failed to find row with run id %q", runID) + return util.NewInvalidInputError("Failed to update run %s. Row not found.", runID) } - if s.metadataStore != nil { - if err := s.metadataStore.RecordOutputArtifacts(runID, storedManifest, workflowRuntimeManifest); err != nil { - // Metadata storage failed. Log the error here, but continue to allow the run - // to be updated as per usual. - glog.Errorf("Failed to record output artifacts: %+v", err) - } + if err := s.metadataStore.RecordOutputArtifacts(runID, storedManifest, workflowRuntimeManifest); err != nil { + // Metadata storage failed. Log the error here, but continue to allow the run + // to be updated as per usual. + glog.Errorf("Failed to record output artifacts: %+v", err) } sql, args, err := sq. From 3b2dcebc4d0e0a6727d1adfacca2eda722f48ac7 Mon Sep 17 00:00:00 2001 From: Ajay Gopinathan Date: Tue, 19 Mar 2019 10:32:38 -0700 Subject: [PATCH 2/2] move select for update to the db interface --- backend/src/apiserver/storage/db.go | 12 ++++++++++++ backend/src/apiserver/storage/run_store.go | 12 +++--------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/backend/src/apiserver/storage/db.go b/backend/src/apiserver/storage/db.go index 44ec8e12cbf..e558989d1f4 100644 --- a/backend/src/apiserver/storage/db.go +++ b/backend/src/apiserver/storage/db.go @@ -52,6 +52,10 @@ type SQLDialect interface { // Check whether the error is a SQL duplicate entry error or not IsDuplicateError(err error) bool + + // Modifies the SELECT clause in query to return one that locks the selected + // row for update. + SelectForUpdate(query string) string } // MySQLDialect implements SQLDialect with mysql dialect implementation. @@ -103,6 +107,14 @@ func (d SQLiteDialect) Concat(exprs []string, separator string) string { return strings.Join(exprs, separatorSQL) } +func (d MySQLDialect) SelectForUpdate(query string) string { + return query + " FOR UPDATE" +} + +func (d SQLiteDialect) SelectForUpdate(query string) string { + return query +} + func (d SQLiteDialect) IsDuplicateError(err error) bool { sqlError, ok := err.(sqlite3.Error) return ok && sqlError.Code == sqlite3.ErrConstraint diff --git a/backend/src/apiserver/storage/run_store.go b/backend/src/apiserver/storage/run_store.go index 60cbeea0266..824594c8302 100644 --- a/backend/src/apiserver/storage/run_store.go +++ b/backend/src/apiserver/storage/run_store.go @@ -365,15 +365,9 @@ func (s *RunStore) UpdateRun(runID string, condition string, workflowRuntimeMani // new in the status of an Argo manifest. This means we need to keep track // manually here on what the previously updated state of the run is, to ensure // we do not add duplicate metadata. Hence the locking below. - var query string - switch x := s.db.SQLDialect.(type) { - case MySQLDialect: - query = "SELECT WorkflowRuntimeManifest FROM run_details WHERE UUID = ? FOR UPDATE" - case SQLiteDialect: - query = "SELECT WorkflowRuntimeManifest FROM run_details WHERE UUID = ?" - default: - glog.Fatal("Unsupported SQL dialect: %v", x) - } + query := "SELECT WorkflowRuntimeManifest FROM run_details WHERE UUID = ?" + query = s.db.SelectForUpdate(query) + row := tx.QueryRow(query, runID) var storedManifest string if err := row.Scan(&storedManifest); err != nil {