Skip to content

Commit

Permalink
feat: add filtering in assets api (#100)
Browse files Browse the repository at this point in the history
* feat: add sort filters in assets api

* feat: add types and servcie filters

* feat: update types filter

* refactor: update type filtering

* fix: asset filtering test

* chore: add query filters

* fix: handlers test

* test: add test for query filters

* chore: update data filter

* feat: filter assets using its data fields

* test: update the filters test

* refactor: update filter query params

* refactor: update data filters query params format

* refactor: update unessential changes

* chore: update swagger

* refactor: update required filter changes

* refactor: update fixture

* refactor: update filter query param format

* refactor: update v1beta1 asset config format

* refactor: update data query format

* refactor: update swagger

* revert: update coverage file format

* refactor: update empty data filter check approach

Co-authored-by: Stewart Jingga <stewart_jingga@yahoo.com>
  • Loading branch information
scortier and StewartJingga authored Apr 1, 2022
1 parent 970c43c commit a88b6fe
Show file tree
Hide file tree
Showing 12 changed files with 509 additions and 114 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

# Output of the go coverage tool, specifically when used with LiteIDE
*.out
coverage.txt

# Dependency directories (remove the comment below to include it)
vendor/
Expand Down
109 changes: 94 additions & 15 deletions api/httpapi/handlers/asset_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"net/url"
"strconv"
"strings"

"github.com/gorilla/mux"
"github.com/odpf/salt/log"
Expand All @@ -19,6 +20,10 @@ import (
"github.com/odpf/columbus/user"
)

var (
dataFilterPrefix = "data"
)

// AssetHandler exposes a REST interface to types
type AssetHandler struct {
logger log.Logger
Expand Down Expand Up @@ -47,8 +52,13 @@ func NewAssetHandler(
}

func (h *AssetHandler) GetAll(w http.ResponseWriter, r *http.Request) {
config := h.buildAssetConfig(r.URL.Query())
assets, err := h.assetRepo.GetAll(r.Context(), config)
cfg, err := h.buildAssetConfig(r.URL.Query())
if err != nil {
WriteJSONError(w, http.StatusBadRequest, bodyParserErrorMsg(err))
return
}

assets, err := h.assetRepo.GetAll(r.Context(), cfg)
if err != nil {
internalServerError(w, h.logger, err.Error())
return
Expand All @@ -61,9 +71,15 @@ func (h *AssetHandler) GetAll(w http.ResponseWriter, r *http.Request) {
withTotal, ok := r.URL.Query()["with_total"]
if ok && len(withTotal) > 0 && withTotal[0] != "false" && withTotal[0] != "0" {
total, err := h.assetRepo.GetCount(r.Context(), asset.Config{
Type: config.Type,
Service: config.Service,
Text: config.Text,
Types: cfg.Types,
Services: cfg.Services,
Size: cfg.Size,
Offset: cfg.Offset,
SortBy: cfg.SortBy,
SortDirection: cfg.SortDirection,
QueryFields: cfg.QueryFields,
Query: cfg.Query,
Data: cfg.Data,
})
if err != nil {
internalServerError(w, h.logger, err.Error())
Expand Down Expand Up @@ -253,12 +269,16 @@ func (h *AssetHandler) GetStargazers(w http.ResponseWriter, r *http.Request) {
}

func (h *AssetHandler) GetVersionHistory(w http.ResponseWriter, r *http.Request) {
config := h.buildAssetConfig(r.URL.Query())
cfg, err := h.buildAssetConfig(r.URL.Query())
if err != nil {
WriteJSONError(w, http.StatusBadRequest, bodyParserErrorMsg(err))
return
}

pathParams := mux.Vars(r)
assetID := pathParams["id"]

assetVersions, err := h.assetRepo.GetVersionHistory(r.Context(), config, assetID)
assetVersions, err := h.assetRepo.GetVersionHistory(r.Context(), cfg, assetID)
if err != nil {
if errors.As(err, new(asset.InvalidError)) {
WriteJSONError(w, http.StatusBadRequest, err.Error())
Expand Down Expand Up @@ -363,29 +383,88 @@ func (h *AssetHandler) validatePatchPayload(assetPayload map[string]interface{})
return
}

func (h *AssetHandler) buildAssetConfig(query url.Values) asset.Config {
config := asset.Config{
Text: query.Get("text"),
Type: asset.Type(query.Get("type")),
Service: query.Get("service"),
func (h *AssetHandler) buildAssetConfig(query url.Values) (cfg asset.Config, err error) {
cfg = asset.Config{
SortBy: query.Get("sort"),
SortDirection: query.Get("direction"),
Query: query.Get("q"),
}

types := query.Get("types")
if types != "" {
typ := strings.Split(types, ",")
for _, typeVal := range typ {
cfg.Types = append(cfg.Types, asset.Type(typeVal))
}
}

services := query.Get("services")
if services != "" {
cfg.Services = strings.Split(services, ",")
}

queriesFields := query.Get("q_fields")
if queriesFields != "" {
cfg.QueryFields = strings.Split(queriesFields, ",")
}

sizeString := query.Get("size")
if sizeString != "" {
size, err := strconv.Atoi(sizeString)
if err == nil {
config.Size = size
cfg.Size = size
}
}

offsetString := query.Get("offset")
if offsetString != "" {
offset, err := strconv.Atoi(offsetString)
if err == nil {
config.Offset = offset
cfg.Offset = offset
}
}

return config
cfg.Data = dataAssetConfigValue(query)
if err = cfg.Validate(); err != nil {
return asset.Config{}, err
}

return cfg, nil
}

func dataAssetConfigValue(queryString url.Values) map[string]string {
dataFilter := make(map[string]string)
preChar := "["
postChar := "]"

// Get substring between two strings.
for key, values := range queryString {
if !strings.HasPrefix(key, dataFilterPrefix) {
continue
}

posFirst := strings.Index(key, preChar)
if posFirst == -1 {
return nil
}
posLast := strings.Index(key, postChar)
if posLast == -1 {
return nil
}
posFirstAdjusted := posFirst + len(preChar)
if posFirstAdjusted >= posLast {
return nil
}

filterKey := key[posFirstAdjusted:posLast]
dataFilter[filterKey] = values[0] // cannot have duplicate query key, always get the first one
}

if len(dataFilter) == 0 {
return nil
}

return dataFilter
}

func (h *AssetHandler) saveLineage(ctx context.Context, ast asset.Asset, upstreams, downstreams []lineage.Node) error {
Expand Down
88 changes: 72 additions & 16 deletions api/httpapi/handlers/asset_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,15 +690,72 @@ func TestAssetHandlerGet(t *testing.T) {
},
{
Description: `should parse querystring to get config`,
Querystring: "?text=asd&type=table&service=bigquery&size=30&offset=50",
Querystring: "?types=table&services=bigquery&size=30&offset=50&sort=created_at&direction=desc&data[dataset]=booking&data[project]=p-godata-id&q=internal&q_fields=name,urn",
ExpectStatus: http.StatusOK,
Setup: func(ctx context.Context, ar *mocks.AssetRepository) {
ar.On("GetAll", ctx, asset.Config{
Text: "asd",
Type: "table",
Service: "bigquery",
Size: 30,
Offset: 50,
Types: []asset.Type{"table"},
Services: []string{"bigquery"},
Size: 30,
Offset: 50,
SortDirection: "desc",
SortBy: "created_at",
Data: map[string]string{
"dataset": "booking",
"project": "p-godata-id",
},
Query: "internal",
QueryFields: []string{"name", "urn"},
}).Return([]asset.Asset{}, nil, nil)
},
},
{
Description: `should parse data and query fields querystring to get config`,
Querystring: "?data[dataset]=booking&data[project]=p-godata-id&q=internal&q_fields=name,urn,description,services",
ExpectStatus: http.StatusOK,
Setup: func(ctx context.Context, ar *mocks.AssetRepository) {
ar.On("GetAll", ctx, asset.Config{
Data: map[string]string{
"dataset": "booking",
"project": "p-godata-id",
},
Query: "internal",
QueryFields: []string{"name", "urn", "description", "services"},
}).Return([]asset.Asset{}, nil, nil)
},
},
{
Description: `should parse data fields querystring to get config`,
Querystring: "?data[dataset]=booking&data[project]=p-godata-id",
ExpectStatus: http.StatusOK,
Setup: func(ctx context.Context, ar *mocks.AssetRepository) {
ar.On("GetAll", ctx, asset.Config{
Data: map[string]string{
"dataset": "booking",
"project": "p-godata-id",
},
}).Return([]asset.Asset{}, nil, nil)
},
},
{
Description: `should parse query fields querystring to get config`,
Querystring: "?q=internal&q_fields=name,urn,description,services",
ExpectStatus: http.StatusOK,
Setup: func(ctx context.Context, ar *mocks.AssetRepository) {
ar.On("GetAll", ctx, asset.Config{
Query: "internal",
QueryFields: []string{"name", "urn", "description", "services"},
}).Return([]asset.Asset{}, nil, nil)
},
},
{
Description: "should convert multiple types and services from querystring to config",
Querystring: "?types=table,job&services=bigquery,kafka",
ExpectStatus: http.StatusOK,
Setup: func(ctx context.Context, ar *mocks.AssetRepository) {
ar.On("GetAll", ctx, asset.Config{
Types: []asset.Type{"table", "job"},
Services: []string{"bigquery", "kafka"},
}).Return([]asset.Asset{}, nil, nil)
},
},
Expand Down Expand Up @@ -736,23 +793,23 @@ func TestAssetHandlerGet(t *testing.T) {
{
Description: "should return total in the payload if with_total flag is given",
ExpectStatus: http.StatusOK,
Querystring: "?with_total=true&text=dsa&type=job&service=kafka&size=10&offset=5",
Querystring: "?with_total=true&types=job&services=kafka&size=10&offset=5",
Setup: func(ctx context.Context, ar *mocks.AssetRepository) {
ar.On("GetAll", ctx, asset.Config{
Text: "dsa",
Type: "job",
Service: "kafka",
Size: 10,
Offset: 5,
Types: []asset.Type{"job"},
Services: []string{"kafka"},
Size: 10,
Offset: 5,
}).Return([]asset.Asset{
{ID: "testid-1"},
{ID: "testid-2"},
{ID: "testid-3"},
}, nil, nil)
ar.On("GetCount", ctx, asset.Config{
Text: "dsa",
Type: "job",
Service: "kafka",
Size: 10,
Offset: 5,
Types: []asset.Type{"job"},
Services: []string{"kafka"},
}).Return(150, nil, nil)
},
PostCheck: func(r *http.Response) error {
Expand Down Expand Up @@ -959,7 +1016,6 @@ func TestAssetHandlerGetVersionHistory(t *testing.T) {
"id": assetID,
})
rw := httptest.NewRecorder()

ar := new(mocks.AssetRepository)
tc.Setup(rr.Context(), ar)

Expand Down
12 changes: 8 additions & 4 deletions api/v1beta1/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ import (
)

func (h *Handler) GetAllAssets(ctx context.Context, req *compassv1beta1.GetAllAssetsRequest) (*compassv1beta1.GetAllAssetsResponse, error) {
config := asset.Config{
config := asset.GRPCConfig{
Text: req.GetText(),
Type: asset.Type(req.GetType()),
Service: req.GetService(),
Size: int(req.GetSize()),
Offset: int(req.GetOffset()),
}
cfg := config.ToConfig()

assets, err := h.AssetRepository.GetAll(ctx, config)
assets, err := h.AssetRepository.GetAll(ctx, cfg)
if err != nil {
return nil, internalServerError(h.Logger, err.Error())
}
Expand All @@ -42,11 +43,14 @@ func (h *Handler) GetAllAssets(ctx context.Context, req *compassv1beta1.GetAllAs
}

if req.GetWithTotal() {
total, err := h.AssetRepository.GetCount(ctx, asset.Config{
grpcConfig := asset.GRPCConfig{
Type: config.Type,
Service: config.Service,
Text: config.Text,
})
}
cfg = grpcConfig.ToConfig()

total, err := h.AssetRepository.GetCount(ctx, cfg)
if err != nil {
return nil, internalServerError(h.Logger, err.Error())
}
Expand Down
14 changes: 8 additions & 6 deletions api/v1beta1/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@ func TestGetAllAssets(t *testing.T) {
},
ExpectStatus: codes.OK,
Setup: func(ctx context.Context, ar *mocks.AssetRepository) {
ar.On("GetAll", ctx, asset.Config{
cfg := asset.GRPCConfig{
Text: "asd",
Type: "table",
Service: "bigquery",
Size: 30,
Offset: 50,
}).Return([]asset.Asset{}, nil, nil)
}
config := cfg.ToConfig()
ar.On("GetAll", ctx, config).Return([]asset.Asset{}, nil, nil)
},
},
{
Expand Down Expand Up @@ -108,22 +110,22 @@ func TestGetAllAssets(t *testing.T) {
WithTotal: true,
},
Setup: func(ctx context.Context, ar *mocks.AssetRepository) {
ar.On("GetAll", ctx, asset.Config{
ar.On("GetAll", ctx, asset.GRPCConfig{
Text: "dsa",
Type: "job",
Service: "kafka",
Size: 10,
Offset: 5,
}).Return([]asset.Asset{
}.ToConfig()).Return([]asset.Asset{
{ID: "testid-1"},
{ID: "testid-2"},
{ID: "testid-3"},
}, nil, nil)
ar.On("GetCount", ctx, asset.Config{
ar.On("GetCount", ctx, asset.GRPCConfig{
Text: "dsa",
Type: "job",
Service: "kafka",
}).Return(150, nil, nil)
}.ToConfig()).Return(150, nil, nil)
},
PostCheck: func(resp *compassv1beta1.GetAllAssetsResponse) error {
expected := &compassv1beta1.GetAllAssetsResponse{
Expand Down
8 changes: 0 additions & 8 deletions asset/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,6 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

type Config struct {
Text string `json:"text"`
Type Type `json:"type"`
Service string `json:"service"`
Size int `json:"size"`
Offset int `json:"offset"`
}

type Repository interface {
GetAll(context.Context, Config) ([]Asset, error)
GetCount(context.Context, Config) (int, error)
Expand Down
Loading

0 comments on commit a88b6fe

Please sign in to comment.