Skip to content

Commit

Permalink
feat(discovery): search by record's field name (#41)
Browse files Browse the repository at this point in the history
* feat(search): add search by field and support search by description field
feat(search): use query string to search by field
chore: comment out elasticsearch logger
fix(discovery): update search by field query to match query
chore(discovery): update swagger search api with searchby query param

* feat(record): add new email field in owner struct

* feat(discovery): update search by field to accept map string

* feat(discovery): add test to filter by a field

* fix(discovery): append keyword to filter field

* fix(discovery): add filter match query
  • Loading branch information
mabdh authored Dec 17, 2021
1 parent 9fc7637 commit a0c1a23
Show file tree
Hide file tree
Showing 10 changed files with 316 additions and 39 deletions.
3 changes: 3 additions & 0 deletions api/handlers/search_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
var (
filterPrefix = "filter."
whiteListQueryParamKey = "filter.type"

queryPrefix = "query."
)

type SearchHandler struct {
Expand Down Expand Up @@ -77,6 +79,7 @@ func (handler *SearchHandler) buildSearchCfg(params url.Values) (cfg discovery.S
cfg.MaxResults, _ = strconv.Atoi(params.Get("size"))
cfg.Filters = filterConfigFromValues(params)
cfg.RankBy = params.Get("rankby")
cfg.Queries = queryConfigFromValues(params)
cfg.TypeWhiteList, err = parseTypeWhiteList(params)
return
}
Expand Down
33 changes: 32 additions & 1 deletion api/handlers/search_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,30 @@ func TestSearchHandlerSearch(t *testing.T) {
"service": {"kafka", "rabbitmq"},
"data.landscape": {"th"},
},
Queries: make(map[string]string),
}

searcher.On("Search", ctx, cfg).Return([]discovery.SearchResult{}, nil)
},
ValidateResponse: func(tc testCase, body io.Reader) error {
return nil
},
},
{
Title: "should pass queries to search config format",
Querystring: "text=resource&landscape=id,vn&filter.data.landscape=th&filter.type=topic&filter.service=kafka,rabbitmq&query.data.columns.name=timestamp&query.owners.email=john.doe@email.com",
InitSearcher: func(tc testCase, searcher *mock.RecordSearcher) {
cfg := discovery.SearchConfig{
Text: "resource",
TypeWhiteList: []string{"topic"},
Filters: map[string][]string{
"service": {"kafka", "rabbitmq"},
"data.landscape": {"th"},
},
Queries: map[string]string{
"data.columns.name": "timestamp",
"owners.email": "john.doe@email.com",
},
}

searcher.On("Search", ctx, cfg).Return([]discovery.SearchResult{}, nil)
Expand All @@ -73,6 +97,7 @@ func TestSearchHandlerSearch(t *testing.T) {
cfg := discovery.SearchConfig{
Text: "test",
Filters: make(map[string][]string),
Queries: make(map[string]string),
}
response := []discovery.SearchResult{
{
Expand Down Expand Up @@ -123,6 +148,7 @@ func TestSearchHandlerSearch(t *testing.T) {
Text: "resource",
MaxResults: 10,
Filters: make(map[string][]string),
Queries: make(map[string]string),
}

var results []discovery.SearchResult
Expand Down Expand Up @@ -221,14 +247,15 @@ func TestSearchHandlerSuggest(t *testing.T) {
cfg := discovery.SearchConfig{
Text: "test",
Filters: map[string][]string{},
Queries: make(map[string]string),
}
searcher.On("Suggest", ctx, cfg).Return([]string{}, fmt.Errorf("service unavailable"))
},
ExpectStatus: http.StatusInternalServerError,
},
{
Title: "should pass filter to search config format",
Querystring: "text=resource&landscape=id,vn&filter.data.landscape=th&filter.type=topic&filter.service=kafka,rabbitmq",
Querystring: "text=resource&landscape=id,vn&query.description=this is my dashboard&filter.data.landscape=th&filter.type=topic&filter.service=kafka,rabbitmq",
InitSearcher: func(tc testCase, searcher *mock.RecordSearcher) {
cfg := discovery.SearchConfig{
Text: "resource",
Expand All @@ -237,6 +264,9 @@ func TestSearchHandlerSuggest(t *testing.T) {
"service": {"kafka", "rabbitmq"},
"data.landscape": {"th"},
},
Queries: map[string]string{
"description": "this is my dashboard",
},
}

searcher.On("Suggest", ctx, cfg).Return([]string{}, nil)
Expand All @@ -252,6 +282,7 @@ func TestSearchHandlerSuggest(t *testing.T) {
cfg := discovery.SearchConfig{
Text: "test",
Filters: make(map[string][]string),
Queries: make(map[string]string),
}
response := []string{
"test",
Expand Down
14 changes: 14 additions & 0 deletions api/handlers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,17 @@ func filterConfigFromValues(querystring url.Values) map[string][]string {
}
return filter
}

func queryConfigFromValues(querystring url.Values) map[string]string {
var query = make(map[string]string)
for key, values := range querystring {
// filters are of form "query.{field}"
if !strings.HasPrefix(key, queryPrefix) {
continue
}

queryKey := strings.TrimPrefix(key, queryPrefix)
query[queryKey] = values[0] // cannot have duplicate query key, always get the first one
}
return query
}
6 changes: 6 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ func initElasticsearch(config Config) *elasticsearch.Client {
esClient, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: brokers,
Transport: nrelasticsearch.NewRoundTripper(nil),
// uncomment below code to debug request and response to elasticsearch
// Logger: &estransport.ColorLogger{
// Output: os.Stdout,
// EnableRequestBody: true,
// EnableResponseBody: true,
// },
})
if err != nil {
log.Fatalf("error connecting to elasticsearch: %v", err)
Expand Down
3 changes: 3 additions & 0 deletions discovery/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ type SearchConfig struct {

// RankBy is a param to rank based on a specific parameter
RankBy string

// Queries is a param to search a resource based on record's fields
Queries map[string]string
}
8 changes: 8 additions & 0 deletions record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Record struct {
Data map[string]interface{} `json:"data"`
Labels map[string]string `json:"labels"`
Tags []string `json:"tags"`
Owners []Owner `json:"owners"`
Upstreams []LineageRecord `json:"upstreams"`
Downstreams []LineageRecord `json:"downstreams"`
CreatedAt time.Time `json:"created_at"`
Expand All @@ -25,6 +26,13 @@ type LineageRecord struct {
Type string `json:"type"`
}

type Owner struct {
URN string `json:"urn"`
Name string `json:"name"`
Role string `json:"role"`
Email string `json:"email"`
}

type ErrNoSuchRecord struct {
RecordID string
}
Expand Down
24 changes: 22 additions & 2 deletions store/elasticsearch/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ func (sr *Searcher) buildQuery(ctx context.Context, cfg discovery.SearchConfig,
var query elastic.Query

query = sr.buildTextQuery(ctx, cfg.Text)
query = sr.buildFilterQueries(query, cfg.Filters)
query = sr.buildFilterTermQueries(query, cfg.Filters)
query = sr.buildFilterMatchQueries(ctx, query, cfg.Queries)
query = sr.buildFunctionScoreQuery(query, cfg.RankBy)

src, err := query.Source()
Expand Down Expand Up @@ -230,7 +231,25 @@ func (sr *Searcher) buildTextQuery(ctx context.Context, text string) elastic.Que
)
}

func (sr *Searcher) buildFilterQueries(query elastic.Query, filters map[string][]string) elastic.Query {
func (sr *Searcher) buildFilterMatchQueries(ctx context.Context, query elastic.Query, queries map[string]string) elastic.Query {
if len(queries) == 0 {
return query
}

esQueries := []elastic.Query{}
for field, value := range queries {
esQueries = append(esQueries,
elastic.
NewMatchQuery(field, value).
Fuzziness("AUTO"))
}

return elastic.NewBoolQuery().
Should(query).
Filter(esQueries...)
}

func (sr *Searcher) buildFilterTermQueries(query elastic.Query, filters map[string][]string) elastic.Query {
if len(filters) == 0 {
return query
}
Expand All @@ -246,6 +265,7 @@ func (sr *Searcher) buildFilterQueries(query elastic.Query, filters map[string][
values = append(values, rawVal)
}

key := fmt.Sprintf("%s.keyword", key)
filterQueries = append(
filterQueries,
elastic.NewTermsQuery(key, values...),
Expand Down
41 changes: 41 additions & 0 deletions store/elasticsearch/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestSearcherSearch(t *testing.T) {
{Type: "topic", RecordID: "order-topic"},
{Type: "topic", RecordID: "purchase-topic"},
{Type: "topic", RecordID: "consumer-topic"},
{Type: "topic", RecordID: "consumer-mq-2"},
},
},
{
Expand All @@ -78,6 +79,7 @@ func TestSearcherSearch(t *testing.T) {
{Type: "topic", RecordID: "order-topic"},
{Type: "topic", RecordID: "purchase-topic"},
{Type: "topic", RecordID: "consumer-topic"},
{Type: "topic", RecordID: "consumer-mq-2"},
},
},
{
Expand Down Expand Up @@ -115,6 +117,7 @@ func TestSearcherSearch(t *testing.T) {
Expected: []expectedRow{
{Type: "topic", RecordID: "order-topic"},
{Type: "topic", RecordID: "consumer-topic"},
{Type: "topic", RecordID: "consumer-mq-2"},
},
},
{
Expand All @@ -127,6 +130,19 @@ func TestSearcherSearch(t *testing.T) {
"data.company": {"odpf"},
},
},
Expected: []expectedRow{
{Type: "topic", RecordID: "consumer-topic"},
{Type: "topic", RecordID: "consumer-mq-2"},
},
},
{
Description: "should return 'consumer-topic' if filter owner email with 'john.doe@email.com'",
Config: discovery.SearchConfig{
Text: "topic",
Filters: map[string][]string{
"owners.email": {"john.doe@email.com"},
},
},
Expected: []expectedRow{
{Type: "topic", RecordID: "consumer-topic"},
},
Expand All @@ -143,6 +159,31 @@ func TestSearcherSearch(t *testing.T) {
{Type: "table", RecordID: "bigquery::gcpproject/dataset/tablename-1"},
},
},
{
Description: "should return consumer-topic if search by query description field with text 'rabbitmq' and owners name 'johndoe'",
Config: discovery.SearchConfig{
Text: "consumer",
Queries: map[string]string{
"description": "rabbitmq",
"owners.name": "john doe",
},
},
Expected: []expectedRow{
{Type: "topic", RecordID: "consumer-topic"},
},
},
{
Description: "should return 'bigquery::gcpproject/dataset/tablename-common' resource on top if search by query table column name field with text 'tablename-common-column1'",
Config: discovery.SearchConfig{
Text: "tablename",
Queries: map[string]string{
"data.schema.columns.name": "common",
},
},
Expected: []expectedRow{
{Type: "table", RecordID: "bigquery::gcpproject/dataset/tablename-common"},
},
},
}
for _, test := range tests {
t.Run(test.Description, func(t *testing.T) {
Expand Down
Loading

0 comments on commit a0c1a23

Please sign in to comment.