Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: store, retrieve asset.url #209

Merged
merged 1 commit into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/asset/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Asset struct {
Name string `json:"name" diff:"name"`
Description string `json:"description" diff:"description"`
Data map[string]interface{} `json:"data" diff:"data"`
URL string `json:"url" diff:"url"`
Labels map[string]string `json:"labels" diff:"labels"`
Owners []user.User `json:"owners,omitempty" diff:"owners"`
CreatedAt time.Time `json:"created_at" diff:"-"`
Expand Down
5 changes: 5 additions & 0 deletions core/asset/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func TestAssetPatch(t *testing.T) {
"service": "firehose",
"description": "new-description",
"name": "new-name",
"url": "https://sample-url.com",
"labels": map[string]string{
"bar": "foo",
"bar2": "foo2",
Expand All @@ -307,6 +308,7 @@ func TestAssetPatch(t *testing.T) {
Service: "firehose",
Description: "new-description",
Name: "new-name",
URL: "https://sample-url.com",
Labels: map[string]string{
"bar": "foo",
"bar2": "foo2",
Expand All @@ -325,6 +327,7 @@ func TestAssetPatch(t *testing.T) {
Service: "optimus",
Description: "sample-description",
Name: "old-name",
URL: "https://sample-url-old.com",
Labels: map[string]string{
"foo": "bar",
},
Expand All @@ -338,6 +341,7 @@ func TestAssetPatch(t *testing.T) {
"service": "firehose",
"description": "new-description",
"name": "new-name",
"url": "https://sample-url.com",
"labels": map[string]string{
"bar": "foo",
"bar2": "foo2",
Expand All @@ -353,6 +357,7 @@ func TestAssetPatch(t *testing.T) {
Service: "firehose",
Description: "new-description",
Name: "new-name",
URL: "https://sample-url.com",
Labels: map[string]string{
"bar": "foo",
"bar2": "foo2",
Expand Down
1 change: 1 addition & 0 deletions core/asset/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ func patchAsset(a *Asset, patchData map[string]interface{}) {
a.Service = patchString("service", patchData, a.Service)
a.Name = patchString("name", patchData, a.Name)
a.Description = patchString("description", patchData, a.Description)
a.URL = patchString("url", patchData, a.URL)

labels, exists := patchData["labels"]
if exists {
Expand Down
5 changes: 5 additions & 0 deletions internal/server/v1beta1/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ func (server *APIServer) buildAsset(baseAsset *compassv1beta1.UpsertAssetRequest
Name: baseAsset.GetName(),
Description: baseAsset.GetDescription(),
Data: baseAsset.GetData().AsMap(),
URL: baseAsset.Url,
Labels: baseAsset.GetLabels(),
}

Expand Down Expand Up @@ -500,6 +501,9 @@ func decodePatchAssetToMap(pb *compassv1beta1.UpsertPatchAssetRequest_Asset) map
if pb.GetData() != nil {
m["data"] = pb.GetData().AsMap()
}
if len(pb.Url) > 0 {
m["url"] = pb.Url
}
if pb.GetLabels() != nil {
m["labels"] = pb.GetLabels()
}
Expand Down Expand Up @@ -578,6 +582,7 @@ func assetToProto(a asset.Asset, withChangelog bool) (assetPB *compassv1beta1.As
Name: a.Name,
Description: a.Description,
Data: data,
Url: a.URL,
Labels: a.Labels,
Owners: owners,
Version: a.Version,
Expand Down
7 changes: 7 additions & 0 deletions internal/server/v1beta1/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func TestUpsertAsset(t *testing.T) {
Name: "new-name",
Service: "kafka",
Data: &structpb.Struct{},
Url: "https://sample-url.com",
Owners: []*compassv1beta1.User{{Id: "id", Uuid: "", Email: "email@email.com", Provider: "provider"}},
},
Upstreams: []*compassv1beta1.LineageNode{
Expand Down Expand Up @@ -450,6 +451,7 @@ func TestUpsertAsset(t *testing.T) {
Service: "kafka",
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
URL: "https://sample-url.com",
Owners: []user.User{{ID: "id", UUID: "", Email: "email@email.com", Provider: "provider"}},
}
upstreams := []string{"upstream-1"}
Expand Down Expand Up @@ -520,6 +522,7 @@ func TestUpsertPatchAsset(t *testing.T) {
Name: wrapperspb.String("new-name"),
Service: "kafka",
Data: &structpb.Struct{},
Url: "https://sample-url.com",
Owners: []*compassv1beta1.User{{Id: "id", Uuid: "", Email: "email@email.com", Provider: "provider"}},
},
Upstreams: []*compassv1beta1.LineageNode{
Expand Down Expand Up @@ -549,6 +552,7 @@ func TestUpsertPatchAsset(t *testing.T) {
Service: "kafka",
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
URL: "https://sample-url-old.com",
Owners: []user.User{{ID: "id", UUID: "", Email: "email@email.com", Provider: "provider"}},
}
)
Expand Down Expand Up @@ -652,6 +656,7 @@ func TestUpsertPatchAsset(t *testing.T) {
Service: "kafka",
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
URL: "https://sample-url.com",
Owners: []user.User{{ID: "id", UUID: "", Email: "email@email.com", Provider: "provider"}},
}
upstreams := []string{"upstream-1"}
Expand Down Expand Up @@ -688,6 +693,7 @@ func TestUpsertPatchAsset(t *testing.T) {
Service: "kafka",
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
URL: "https://sample-url-old.com",
Owners: []user.User{{ID: "id", UUID: "", Email: "email@email.com", Provider: "provider"}},
}

Expand Down Expand Up @@ -733,6 +739,7 @@ func TestUpsertPatchAsset(t *testing.T) {
Service: "kafka",
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
URL: "https://sample-url-old.com",
Owners: []user.User{{ID: "id", UUID: "", Email: "email@email.com", Provider: "provider"}},
}

Expand Down
2 changes: 2 additions & 0 deletions internal/store/postgres/asset_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type AssetModel struct {
Service string `db:"service"`
Description string `db:"description"`
Data JSONMap `db:"data"`
URL string `db:"url"`
Labels JSONMap `db:"labels"`
Version string `db:"version"`
UpdatedBy UserModel `db:"updated_by"`
Expand All @@ -41,6 +42,7 @@ func (a *AssetModel) toAsset(owners []user.User) asset.Asset {
Service: a.Service,
Description: a.Description,
Data: a.Data,
URL: a.URL,
Labels: a.buildLabels(),
Owners: owners,
Version: a.Version,
Expand Down
38 changes: 20 additions & 18 deletions internal/store/postgres/asset_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,8 @@ func (r *AssetRepository) deleteWithPredicate(ctx context.Context, pred sq.Eq) (
func (r *AssetRepository) insert(ctx context.Context, ast *asset.Asset) (id string, err error) {
err = r.client.RunWithinTx(ctx, func(tx *sqlx.Tx) error {
query, args, err := sq.Insert("assets").
Columns("urn", "type", "service", "name", "description", "data", "labels", "updated_by", "version").
Values(ast.URN, ast.Type, ast.Service, ast.Name, ast.Description, ast.Data, ast.Labels, ast.UpdatedBy.ID, asset.BaseVersion).
Columns("urn", "type", "service", "name", "description", "data", "url", "labels", "updated_by", "version").
Values(ast.URN, ast.Type, ast.Service, ast.Name, ast.Description, ast.Data, ast.URL, ast.Labels, ast.UpdatedBy.ID, asset.BaseVersion).
Suffix("RETURNING \"id\"").
PlaceholderFormat(sq.Dollar).
ToSql()
Expand Down Expand Up @@ -490,7 +490,6 @@ func (r *AssetRepository) insert(ctx context.Context, ast *asset.Asset) (id stri
}

func (r *AssetRepository) update(ctx context.Context, assetID string, newAsset *asset.Asset, oldAsset *asset.Asset, clog diff.Changelog) error {

if !isValidUUID(assetID) {
return asset.InvalidError{AssetID: assetID}
}
Expand All @@ -508,22 +507,24 @@ func (r *AssetRepository) update(ctx context.Context, assetID string, newAsset *
newAsset.Version = newVersion
newAsset.ID = oldAsset.ID

err = r.execContext(ctx, tx,
`UPDATE assets
SET urn = $1,
type = $2,
service = $3,
name = $4,
description = $5,
data = $6,
labels = $7,
updated_at = $8,
updated_by = $9,
version = $10
WHERE id = $11;
`,
newAsset.URN, newAsset.Type, newAsset.Service, newAsset.Name, newAsset.Description, newAsset.Data, newAsset.Labels, time.Now(), newAsset.UpdatedBy.ID, newAsset.Version, assetID)
query, args, err := r.buildSQL(sq.Update("assets").
Set("urn", newAsset.URN).
Set("type", newAsset.Type).
Set("service", newAsset.Service).
Set("name", newAsset.Name).
Set("description", newAsset.Description).
Set("data", newAsset.Data).
Set("url", newAsset.URL).
Set("labels", newAsset.Labels).
Set("updated_at", time.Now()).
Set("updated_by", newAsset.UpdatedBy.ID).
Set("version", newAsset.Version).
Where(sq.Eq{"id": assetID}))
if err != nil {
return fmt.Errorf("build query: %w", err)
}

if err := r.execContext(ctx, tx, query, args...); err != nil {
return fmt.Errorf("error running update asset query: %w", err)
}

Expand Down Expand Up @@ -775,6 +776,7 @@ func (r *AssetRepository) getAssetSQL() sq.SelectBuilder {
a.service as service,
a.description as description,
a.data as data,
COALESCE(a.url, '') as url,
Copy link

@haveiss haveiss Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how this compared to set url default to '' when altering table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Altering a table with default value used to take a lock for updating all rows (which would block reads and writes). This looks to have been fixed in PG 11 but I wasn't aware of that while making these changes.

a.labels as labels,
a.version as version,
a.created_at as created_at,
Expand Down
31 changes: 31 additions & 0 deletions internal/store/postgres/asset_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,7 @@ func (r *AssetRepositoryTestSuite) TestUpsert() {
Type: "table",
Service: "bigquery",
Version: "0.1",
URL: "https://sample-url.com",
UpdatedBy: r.users[0],
}
id, err := r.repository.Upsert(r.ctx, &ast)
Expand Down Expand Up @@ -1019,6 +1020,36 @@ func (r *AssetRepositoryTestSuite) TestUpsert() {
r.Equal(ast.ID, identicalAsset.ID)
})

r.Run("should update the asset if asset is not identical", func() {
ast := asset.Asset{
URN: "urn-u-2",
Type: "table",
Service: "bigquery",
URL: "https://sample-url-old.com",
UpdatedBy: r.users[0],
}

id, err := r.repository.Upsert(r.ctx, &ast)
r.Require().NoError(err)
r.NotEmpty(id)
ast.ID = id

updated := ast
updated.URL = "https://sample-url.com"

id, err = r.repository.Upsert(r.ctx, &updated)
r.Require().NoError(err)
r.NotEmpty(id)
updated.ID = id

r.Equal(ast.ID, updated.ID)

actual, err := r.repository.GetByID(r.ctx, ast.ID)
r.NoError(err)

r.Equal(updated.URL, actual.URL)
})

r.Run("should delete old owners if it does not exist on new asset", func() {
ast := asset.Asset{
URN: "urn-u-4",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE assets DROP COLUMN IF EXISTS url;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE assets ADD COLUMN IF NOT EXISTS url TEXT;