Skip to content

Commit

Permalink
feat: new lineage module with postgres (#86)
Browse files Browse the repository at this point in the history
* chore: revert back lineage to use discovery data for backward comp

* feat: add new lineage module using postgres

* feat(lineage): add /v1beta2/lineage api

* fix: linter error

* refactor: remove unnecessary transactions

* fix: invalid comment

* refactor: logging internal server error properly

* feat: move old lineage to /v1

* refactor: remove identical route assignment

* chore: remove /v1 apis

* chore: remove old lineage module

* refactor: rename handler

* chore: update api docs
  • Loading branch information
StewartJingga authored Mar 10, 2022
1 parent 91df258 commit 5ace2b2
Show file tree
Hide file tree
Showing 34 changed files with 923 additions and 1,225 deletions.
75 changes: 57 additions & 18 deletions api/handlers/asset_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package handlers

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -13,36 +14,41 @@ import (

"github.com/odpf/columbus/asset"
"github.com/odpf/columbus/discovery"
"github.com/odpf/columbus/lineage"
"github.com/odpf/columbus/star"
"github.com/odpf/columbus/user"
)

// AssetHandler exposes a REST interface to types
type AssetHandler struct {
logger log.Logger
assetRepository asset.Repository
discoveryRepo discovery.Repository
starRepository star.Repository
logger log.Logger
assetRepo asset.Repository
discoveryRepo discovery.Repository
starRepo star.Repository
lineageRepo lineage.Repository
}

func NewAssetHandler(
logger log.Logger,
assetRepository asset.Repository,
assetRepo asset.Repository,
discoveryRepo discovery.Repository,
starRepository star.Repository) *AssetHandler {
starRepo star.Repository,
lineageRepo lineage.Repository,
) *AssetHandler {
handler := &AssetHandler{
logger: logger,
assetRepository: assetRepository,
discoveryRepo: discoveryRepo,
starRepository: starRepository,
logger: logger,
assetRepo: assetRepo,
discoveryRepo: discoveryRepo,
starRepo: starRepo,
lineageRepo: lineageRepo,
}

return handler
}

func (h *AssetHandler) GetAll(w http.ResponseWriter, r *http.Request) {
config := h.buildAssetConfig(r.URL.Query())
assets, err := h.assetRepository.GetAll(r.Context(), config)
assets, err := h.assetRepo.GetAll(r.Context(), config)
if err != nil {
internalServerError(w, h.logger, err.Error())
return
Expand All @@ -54,7 +60,7 @@ func (h *AssetHandler) GetAll(w http.ResponseWriter, r *http.Request) {

withTotal, ok := r.URL.Query()["with_total"]
if ok && len(withTotal) > 0 && withTotal[0] != "false" && withTotal[0] != "0" {
total, err := h.assetRepository.GetCount(r.Context(), asset.Config{
total, err := h.assetRepo.GetCount(r.Context(), asset.Config{
Type: config.Type,
Service: config.Service,
Text: config.Text,
Expand All @@ -73,7 +79,7 @@ func (h *AssetHandler) GetByID(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
assetID := vars["id"]

ast, err := h.assetRepository.GetByID(r.Context(), assetID)
ast, err := h.assetRepo.GetByID(r.Context(), assetID)
if err != nil {
if errors.As(err, new(asset.InvalidError)) {
WriteJSONError(w, http.StatusBadRequest, err.Error())
Expand Down Expand Up @@ -110,7 +116,7 @@ func (h *AssetHandler) Upsert(w http.ResponseWriter, r *http.Request) {
}

ast.UpdatedBy.ID = userID
assetID, err := h.assetRepository.Upsert(r.Context(), &ast)
assetID, err := h.assetRepo.Upsert(r.Context(), &ast)
if errors.As(err, new(asset.InvalidError)) {
WriteJSONError(w, http.StatusBadRequest, err.Error())
return
Expand All @@ -126,6 +132,11 @@ func (h *AssetHandler) Upsert(w http.ResponseWriter, r *http.Request) {
return
}

if err := h.saveLineage(r.Context(), ast); err != nil {
internalServerError(w, h.logger, err.Error())
return
}

writeJSON(w, http.StatusOK, map[string]interface{}{
"id": ast.ID,
})
Expand All @@ -135,7 +146,7 @@ func (h *AssetHandler) Delete(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
assetID := vars["id"]

if err := h.assetRepository.Delete(r.Context(), assetID); err != nil {
if err := h.assetRepo.Delete(r.Context(), assetID); err != nil {
if errors.As(err, new(asset.InvalidError)) {
WriteJSONError(w, http.StatusBadRequest, err.Error())
return
Expand All @@ -162,7 +173,7 @@ func (h *AssetHandler) GetStargazers(w http.ResponseWriter, r *http.Request) {
pathParams := mux.Vars(r)
assetID := pathParams["id"]

users, err := h.starRepository.GetStargazers(r.Context(), starCfg, assetID)
users, err := h.starRepo.GetStargazers(r.Context(), starCfg, assetID)
if err != nil {
if errors.Is(err, star.ErrEmptyUserID) || errors.Is(err, star.ErrEmptyAssetID) || errors.As(err, new(star.InvalidError)) {
WriteJSONError(w, http.StatusBadRequest, err.Error())
Expand All @@ -185,7 +196,7 @@ func (h *AssetHandler) GetVersionHistory(w http.ResponseWriter, r *http.Request)
pathParams := mux.Vars(r)
assetID := pathParams["id"]

assetVersions, err := h.assetRepository.GetVersionHistory(r.Context(), config, assetID)
assetVersions, err := h.assetRepo.GetVersionHistory(r.Context(), config, assetID)
if err != nil {
if errors.As(err, new(asset.InvalidError)) {
WriteJSONError(w, http.StatusBadRequest, err.Error())
Expand Down Expand Up @@ -213,7 +224,7 @@ func (h *AssetHandler) GetByVersion(w http.ResponseWriter, r *http.Request) {
return
}

ast, err := h.assetRepository.GetByVersion(r.Context(), assetID, version)
ast, err := h.assetRepo.GetByVersion(r.Context(), assetID, version)
if err != nil {
if errors.As(err, new(asset.InvalidError)) {
WriteJSONError(w, http.StatusBadRequest, err.Error())
Expand Down Expand Up @@ -277,3 +288,31 @@ func (h *AssetHandler) buildAssetConfig(query url.Values) asset.Config {

return config
}

func (h *AssetHandler) saveLineage(ctx context.Context, ast asset.Asset) error {
node := lineage.Node{
URN: ast.URN,
Type: ast.Type,
Service: ast.Service,
}

upstreams := []lineage.Node{}
for _, n := range ast.Upstreams { // nolint:staticcheck
upstreams = append(upstreams, lineage.Node{
URN: n.URN,
Type: n.Type,
Service: n.Service,
})
}

downstreams := []lineage.Node{}
for _, n := range ast.Downstreams {
downstreams = append(downstreams, lineage.Node{
URN: n.URN,
Type: n.Type,
Service: n.Service,
})
}

return h.lineageRepo.Upsert(ctx, node, upstreams, downstreams)
}
Loading

0 comments on commit 5ace2b2

Please sign in to comment.