Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fake metadata store and fix tests. #958

Merged
merged 2 commits into from
Mar 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,23 @@ Pipelines backend.
All components can be built using [Bazel](https://bazel.build/). To build
everything under backend, run:
```
bazel build //backend/...
bazel build --action_env=PATH --define=grpc_no_ares=true //backend/...
```

To run all tests:
```
bazel test //backend/...
bazel test --action_env=PATH --define=grpc_no_ares=true //backend/...
```

The API server itself can only be built/tested using Bazel. The following commands target building and testing just the API server.
```
bazel build --action_env=PATH --define=grpc_no_ares=true backend/src/apiserver/...
```
```
bazel test --action_env=PATH --define=grpc_no_ares=true backend/src/apiserver/...
```


## Building Go client library and swagger files
After making changes to proto files, the Go client libraries and swagger
files need to be regenerated and checked-in. The backend/api/generate_api.sh
Expand Down
3 changes: 3 additions & 0 deletions backend/src/apiserver/resource/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//backend/api:go_default_library",
"//backend/src/apiserver/common:go_default_library",
"//backend/src/apiserver/list:go_default_library",
"//backend/src/apiserver/metadata:go_default_library",
"//backend/src/apiserver/model:go_default_library",
"//backend/src/apiserver/storage:go_default_library",
"//backend/src/common/util:go_default_library",
Expand All @@ -25,6 +26,8 @@ go_library(
"@com_github_argoproj_argo//pkg/client/clientset/versioned/typed/workflow/v1alpha1:go_default_library",
"@com_github_golang_glog//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@google_ml_metadata//ml_metadata/metadata_store:metadata_store_go",
"@google_ml_metadata//ml_metadata/proto:metadata_store_go_proto",
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
"@io_k8s_apimachinery//pkg/types:go_default_library",
"@io_k8s_apimachinery//pkg/watch:go_default_library",
Expand Down
18 changes: 17 additions & 1 deletion backend/src/apiserver/resource/client_manager_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
package resource

import (
"ml_metadata/metadata_store/mlmetadata"
mlpb "ml_metadata/proto/metadata_store_go_proto"

workflowclient "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/golang/glog"
"github.com/kubeflow/pipelines/backend/src/apiserver/metadata"
"github.com/kubeflow/pipelines/backend/src/apiserver/storage"
"github.com/kubeflow/pipelines/backend/src/common/util"
scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1"
Expand Down Expand Up @@ -64,7 +68,7 @@ func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterf
experimentStore: storage.NewExperimentStore(db, time, uuid),
pipelineStore: storage.NewPipelineStore(db, time, uuid),
jobStore: storage.NewJobStore(db, time),
runStore: storage.NewRunStore(db, time, nil),
runStore: storage.NewRunStore(db, time, initFakeMetadataStore()),
workflowClientFake: NewWorkflowClientFake(),
resourceReferenceStore: storage.NewResourceReferenceStore(db),
dBStatusStore: storage.NewDBStatusStore(db),
Expand All @@ -75,6 +79,18 @@ func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterf
}, nil
}

func initFakeMetadataStore() *metadata.Store {
cfg := &mlpb.ConnectionConfig{
Config: &mlpb.ConnectionConfig_FakeDatabase{&mlpb.FakeDatabaseConfig{}},
}

mlmdStore, err := mlmetadata.NewStore(cfg)
if err != nil {
glog.Fatalf("Failed to create ML Metadata store: %v", err)
}
return metadata.NewStore(mlmdStore)
}

func NewFakeClientManagerOrFatal(time util.TimeInterface) *FakeClientManager {
uuid := util.NewFakeUUIDGeneratorOrFatal(DefaultFakeUUID, nil)
fakeStore, err := NewFakeClientManager(time, uuid)
Expand Down
12 changes: 12 additions & 0 deletions backend/src/apiserver/storage/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type SQLDialect interface {

// Check whether the error is a SQL duplicate entry error or not
IsDuplicateError(err error) bool

// Modifies the SELECT clause in query to return one that locks the selected
// row for update.
SelectForUpdate(query string) string
}

// MySQLDialect implements SQLDialect with mysql dialect implementation.
Expand Down Expand Up @@ -103,6 +107,14 @@ func (d SQLiteDialect) Concat(exprs []string, separator string) string {
return strings.Join(exprs, separatorSQL)
}

func (d MySQLDialect) SelectForUpdate(query string) string {
return query + " FOR UPDATE"
}

func (d SQLiteDialect) SelectForUpdate(query string) string {
return query
}

func (d SQLiteDialect) IsDuplicateError(err error) bool {
sqlError, ok := err.(sqlite3.Error)
return ok && sqlError.Code == sqlite3.ErrConstraint
Expand Down
17 changes: 9 additions & 8 deletions backend/src/apiserver/storage/run_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,19 +365,20 @@ func (s *RunStore) UpdateRun(runID string, condition string, workflowRuntimeMani
// new in the status of an Argo manifest. This means we need to keep track
// manually here on what the previously updated state of the run is, to ensure
// we do not add duplicate metadata. Hence the locking below.
row := tx.QueryRow("SELECT WorkflowRuntimeManifest FROM run_details WHERE UUID = ? FOR UPDATE", runID)
query := "SELECT WorkflowRuntimeManifest FROM run_details WHERE UUID = ?"
query = s.db.SelectForUpdate(query)

row := tx.QueryRow(query, runID)
var storedManifest string
if err := row.Scan(&storedManifest); err != nil {
tx.Rollback()
return util.NewInternalServerError(err, "failed to find row with run id %q", runID)
return util.NewInvalidInputError("Failed to update run %s. Row not found.", runID)
}

if s.metadataStore != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the null design pattern here so that we don't forget to perform this check? (i.e. we have an interface with an actual implementation calling MySQL, and actual implementation calling SQLLite, and a "NullImplementation" that does not do anything.

This pattern really helps keep the code concise and easy to read by leveraging OO instead of requiring if/then/switch statement in several places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the comparison to nil because I now expect s.metadataStore to always be non-nil, unlike before, when it was nil in tests.

Go is not an OO language, and I don't think it's a good practice to shoehorn OO design patterns in Go code.

if err := s.metadataStore.RecordOutputArtifacts(runID, storedManifest, workflowRuntimeManifest); err != nil {
// Metadata storage failed. Log the error here, but continue to allow the run
// to be updated as per usual.
glog.Errorf("Failed to record output artifacts: %+v", err)
}
if err := s.metadataStore.RecordOutputArtifacts(runID, storedManifest, workflowRuntimeManifest); err != nil {
// Metadata storage failed. Log the error here, but continue to allow the run
// to be updated as per usual.
glog.Errorf("Failed to record output artifacts: %+v", err)
}

sql, args, err := sq.
Expand Down