Skip to content

Commit

Permalink
refactor: remove old lineage code (#94)
Browse files Browse the repository at this point in the history
* refactor: remove old lineage code

* fix: typo on upsertAssertPayload
  • Loading branch information
StewartJingga authored Mar 17, 2022
1 parent 1aaa0ed commit 56f07e1
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 49 deletions.
32 changes: 9 additions & 23 deletions api/handlers/asset_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,14 @@ func (h *AssetHandler) Upsert(w http.ResponseWriter, r *http.Request) {
return
}

var ast asset.Asset
err := json.NewDecoder(r.Body).Decode(&ast)
var payload upsertAssetPayload
err := json.NewDecoder(r.Body).Decode(&payload)
if err != nil {
WriteJSONError(w, http.StatusBadRequest, bodyParserErrorMsg(err))
return
}

ast := payload.Asset
if err := h.validateAsset(ast); err != nil {
WriteJSONError(w, http.StatusBadRequest, err.Error())
return
Expand All @@ -132,7 +134,7 @@ func (h *AssetHandler) Upsert(w http.ResponseWriter, r *http.Request) {
return
}

if err := h.saveLineage(r.Context(), ast); err != nil {
if err := h.saveLineage(r.Context(), payload); err != nil {
internalServerError(w, h.logger, err.Error())
return
}
Expand Down Expand Up @@ -289,30 +291,14 @@ func (h *AssetHandler) buildAssetConfig(query url.Values) asset.Config {
return config
}

func (h *AssetHandler) saveLineage(ctx context.Context, ast asset.Asset) error {
func (h *AssetHandler) saveLineage(ctx context.Context, payload upsertAssetPayload) error {
ast := payload.Asset

node := lineage.Node{
URN: ast.URN,
Type: ast.Type,
Service: ast.Service,
}

upstreams := []lineage.Node{}
for _, n := range ast.Upstreams { // nolint:staticcheck
upstreams = append(upstreams, lineage.Node{
URN: n.URN,
Type: n.Type,
Service: n.Service,
})
}

downstreams := []lineage.Node{}
for _, n := range ast.Downstreams {
downstreams = append(downstreams, lineage.Node{
URN: n.URN,
Type: n.Type,
Service: n.Service,
})
}

return h.lineageRepo.Upsert(ctx, node, upstreams, downstreams)
return h.lineageRepo.Upsert(ctx, node, payload.Upstreams, payload.Downstreams)
}
24 changes: 10 additions & 14 deletions api/handlers/asset_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,15 @@ func TestAssetHandlerUpsert(t *testing.T) {
Service: "kafka",
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
Upstreams: []asset.LineageRecord{
{URN: "upstream-1", Type: asset.TypeJob, Service: "optimus"},
},
Downstreams: []asset.LineageRecord{
{URN: "downstream-1", Type: asset.TypeDashboard, Service: "metabase"},
{URN: "downstream-2", Type: asset.TypeDashboard, Service: "tableau"},
},
}
upstreams := []lineage.Node{
{URN: "upstream-1", Type: asset.TypeJob, Service: "optimus"},
}
downstreams := []lineage.Node{
{URN: "downstream-1", Type: asset.TypeDashboard, Service: "metabase"},
{URN: "downstream-2", Type: asset.TypeDashboard, Service: "tableau"},
}

assetWithID := ast
assetWithID.ID = uuid.New().String()

Expand All @@ -231,13 +232,8 @@ func TestAssetHandlerUpsert(t *testing.T) {
Type: ast.Type,
Service: ast.Service,
},
[]lineage.Node{
{URN: "upstream-1", Type: asset.TypeJob, Service: "optimus"},
},
[]lineage.Node{
{URN: "downstream-1", Type: asset.TypeDashboard, Service: "metabase"},
{URN: "downstream-2", Type: asset.TypeDashboard, Service: "tableau"},
},
upstreams,
downstreams,
).Return(nil)
defer lr.AssertExpectations(t)

Expand Down
12 changes: 12 additions & 0 deletions api/handlers/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package handlers

import (
"github.com/odpf/columbus/asset"
"github.com/odpf/columbus/lineage"
)

type upsertAssetPayload struct {
asset.Asset
Upstreams []lineage.Node `json:"upstreams"`
Downstreams []lineage.Node `json:"downstreams"`
}
9 changes: 0 additions & 9 deletions asset/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,10 @@ type Asset struct {
Version string `json:"version" diff:"-"`
UpdatedBy user.User `json:"updated_by" diff:"-"`
Changelog diff.Changelog `json:"changelog,omitempty" diff:"-"`
// Deprecated: this is only to support old lineage
Upstreams []LineageRecord `json:"upstreams"`
Downstreams []LineageRecord `json:"downstreams"`
}

// Diff returns nil changelog with nil error if equal
// returns wrapped r3labs/diff Changelog struct with nil error if not equal
func (a *Asset) Diff(otherAsset *Asset) (diff.Changelog, error) {
return diff.Diff(a, otherAsset, diff.DiscardComplexOrigin(), diff.AllowTypeMismatch(true))
}

type LineageRecord struct {
URN string `json:"urn"`
Type Type `json:"type"`
Service string `json:"service"`
}
6 changes: 3 additions & 3 deletions lineage/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Edge struct {
}

type Node struct {
URN string
Type asset.Type
Service string
URN string `json:"urn"`
Type asset.Type `json:"type"`
Service string `json:"service"`
}

0 comments on commit 56f07e1

Please sign in to comment.