Skip to content

Commit

Permalink
feat(discovery): boost search result with table usage (#37)
Browse files Browse the repository at this point in the history
* feat: boost with table usage if there is filter service bigquery

* feat: add search query param sortby

* fix: rebase to master

* fix(elasticsearch): remove elasticsearch dev logger

* chore: update swagger search api

* fix(search): rename sortby query param to rankby param in search API

* refactor(elasticsearch): simplify query building

Co-authored-by: Stewart Jingga <stewart_jingga@yahoo.com>
  • Loading branch information
mabdh and StewartJingga authored Dec 14, 2021
1 parent ef551e5 commit 695a179
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 84 deletions.
5 changes: 1 addition & 4 deletions api/handlers/search_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,8 @@ func (handler *SearchHandler) buildSearchCfg(params url.Values) (cfg discovery.S
cfg.Text = text
cfg.MaxResults, _ = strconv.Atoi(params.Get("size"))
cfg.Filters = filterConfigFromValues(params)
cfg.RankBy = params.Get("rankby")
cfg.TypeWhiteList, err = parseTypeWhiteList(params)
if err != nil {
return
}

return
}

Expand Down
3 changes: 3 additions & 0 deletions discovery/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@ type SearchConfig struct {
// List of record types to search for
// a zero value signifies that all types should be searched
TypeWhiteList []string

// RankBy is a param to rank based on a specific parameter
RankBy string
}
49 changes: 40 additions & 9 deletions store/elasticsearch/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ import (
)

var (
defaultMaxResults = 200
defaultMinScore = 0.01
defaultMaxResults = 200
defaultMinScore = 0.01
defaultFunctionScoreQueryScoreMode = "sum"
)

type SearcherConfig struct {
Expand Down Expand Up @@ -68,6 +69,7 @@ func (sr *Searcher) Search(ctx context.Context, cfg discovery.SearchConfig) (res
err = errors.Wrap(err, "error building query")
return
}

res, err := sr.cli.Search(
sr.cli.Search.WithBody(query),
sr.cli.Search.WithIndex(indices...),
Expand Down Expand Up @@ -120,11 +122,11 @@ func anyValidStringSlice(slices ...[]string) []string {
}

func (sr *Searcher) buildQuery(ctx context.Context, cfg discovery.SearchConfig, indices []string) (io.Reader, error) {
textQuery := sr.buildTextQuery(ctx, cfg.Text)
filterQueries := sr.buildFilterQueries(cfg.Filters)
query := elastic.NewBoolQuery().
Should(textQuery).
Filter(filterQueries...)
var query elastic.Query

query = sr.buildTextQuery(ctx, cfg.Text)
query = sr.buildFilterQueries(query, cfg.Filters)
query = sr.buildFunctionScoreQuery(query, cfg.RankBy)

src, err := query.Source()
if err != nil {
Expand Down Expand Up @@ -166,7 +168,12 @@ func (sr *Searcher) buildTextQuery(ctx context.Context, text string) elastic.Que
)
}

func (sr *Searcher) buildFilterQueries(filters map[string][]string) (filterQueries []elastic.Query) {
func (sr *Searcher) buildFilterQueries(query elastic.Query, filters map[string][]string) elastic.Query {
if len(filters) == 0 {
return query
}

var filterQueries []elastic.Query
for key, rawValues := range filters {
if len(rawValues) < 1 {
continue
Expand All @@ -182,7 +189,31 @@ func (sr *Searcher) buildFilterQueries(filters map[string][]string) (filterQueri
elastic.NewTermsQuery(key, values...),
)
}
return

newQuery := elastic.NewBoolQuery().
Should(query).
Filter(filterQueries...)

return newQuery
}

func (sr *Searcher) buildFunctionScoreQuery(query elastic.Query, rankBy string) elastic.Query {
if rankBy == "" {
return query
}

factorFunc := elastic.NewFieldValueFactorFunction().
Field(rankBy).
Modifier("log1p").
Missing(1.0).
Weight(1.0)

fsQuery := elastic.NewFunctionScoreQuery().
ScoreMode(defaultFunctionScoreQueryScoreMode).
AddScoreFunc(factorFunc).
Query(query)

return fsQuery
}

func (sr *Searcher) toSearchResults(hits []searchHit) []discovery.SearchResult {
Expand Down
12 changes: 12 additions & 0 deletions store/elasticsearch/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,18 @@ func TestSearch(t *testing.T) {
{Type: "topic", RecordID: "consumer-topic"},
},
},
{
Description: "should return a descendingly sorted based on usage count in search results if rank by usage in the config",
Config: discovery.SearchConfig{
Text: "bigquery",
RankBy: "data.profile.usage_count",
},
Expected: []expectedRow{
{Type: "table", RecordID: "bigquery::gcpproject/dataset/tablename-common"},
{Type: "table", RecordID: "bigquery::gcpproject/dataset/tablename-mid"},
{Type: "table", RecordID: "bigquery::gcpproject/dataset/tablename-1"},
},
},
}
for _, test := range tests {
t.Run(test.Description, func(t *testing.T) {
Expand Down
160 changes: 153 additions & 7 deletions store/elasticsearch/testdata/search-test-fixture.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,17 @@
"description": "Transaction records for every microsoft purchase",
"total_rows": 100,
"schema": [
{"name": "id"},
{"name": "username", "description": "purchaser username"},
{"name": "item_id", "description": "item identifications"}
{
"name": "id"
},
{
"name": "username",
"description": "purchaser username"
},
{
"name": "item_id",
"description": "item identifications"
}
]
}
},
Expand All @@ -96,12 +104,150 @@
"description": "Transaction records for every Apple purchase",
"total_rows": 100,
"schema": [
{"name": "id"},
{"name": "user_id", "description": "purchaser user idenfitication"},
{"name": "item_id", "description": "item identifications"}
{
"name": "id"
},
{
"name": "user_id",
"description": "purchaser user idenfitication"
},
{
"name": "item_id",
"description": "item identifications"
}
]
}
},
{
"urn": "bigquery::gcpproject/dataset/tablename-1",
"name": "tablename-1",
"service": "bigquery",
"description": "A sample of table record",
"data": {
"preview": {},
"profile": {
"common_join": [
{
"conditions": [
"ON target.column_1 = source.column_1 and target.column_3 = source.column_3 and DATE(target.event_timestamp) = DATE(source.event_timestamp)"
],
"count": 1,
"urn": "bigquery::gcpproject/dataset/tablename-mid"
}
],
"filter_conditions": [
"WHERE t.column_5 = 'success' AND t.item_id = \"280481a2-2384-4b81-aa3e-214ac60b31db\" AND event_timestamp >= TIMESTAMP(\"2021-10-29\", \"UTC\") AND event_timestamp < TIMESTAMP(\"2021-11-22T02:01:06Z\")"
],
"usage_count": 1
},
"properties": {
"attributes": {
"dataset": "dataset",
"full_qualified_name": "gcpproject:dataset.tablename-1",
"partition_field": "event_timestamp",
"project": "gcpproject",
"type": "TABLE"
},
"labels": {
"owner": "user_1"
}
},
"resource": {
"name": "tablename-1",
"service": "bigquery",
"urn": "bigquery::gcpproject/dataset/tablename-1"
}
}
},
{
"urn": "bigquery::gcpproject/dataset/tablename-common",
"name": "tablename-common",
"service": "bigquery",
"description": "A sample of table record with high usage",
"data": {
"preview": {},
"profile": {
"common_join": [
{
"conditions": [
"ON target.column_1 = source.column_1 and target.column_3 = source.column_3 and DATE(target.event_timestamp) = DATE(source.event_timestamp)"
],
"count": 1,
"urn": "bigquery::gcpproject/dataset/tablename-mid"
}
],
"filter_conditions": [
"WHERE t.column_5 = 'success' AND t.item_id = \"280481a2-2384-4b81-aa3e-214ac60b31db\" AND event_timestamp >= TIMESTAMP(\"2021-10-29\", \"UTC\") AND event_timestamp < TIMESTAMP(\"2021-11-22T02:01:06Z\")"
],
"usage_count": 10
},
"properties": {
"attributes": {
"dataset": "dataset",
"full_qualified_name": "gcpproject:dataset.tablename-common",
"partition_field": "event_timestamp",
"project": "gcpproject",
"type": "TABLE"
},
"labels": {
"owner": "user_1"
}
},
"resource": {
"name": "tablename-common",
"service": "bigquery",
"urn": "bigquery::gcpproject/dataset/tablename-common"
}
}
},
{
"urn": "bigquery::gcpproject/dataset/tablename-mid",
"name": "tablename-mid",
"service": "bigquery",
"description": "A sample of table record with mid usage",
"data": {
"preview": {},
"profile": {
"common_join": [
{
"conditions": [
"ON target.column_1 = source.column_1 and target.column_3 = source.column_3 and DATE(target.event_timestamp) = DATE(source.event_timestamp)"
],
"count": 1,
"urn": "bigquery::gcpproject/dataset/tablename-high"
},
{
"conditions": [
"ON target.column_1 = source.column_1 and target.column_3 = source.column_3 and DATE(target.event_timestamp) = DATE(source.event_timestamp)"
],
"count": 1,
"urn": "bigquery::gcpproject/dataset/tablename-1"
}
],
"filter_conditions": [
"WHERE t.column_5 = 'success' AND t.item_id = \"280481a2-2384-4b81-aa3e-214ac60b31db\" AND event_timestamp >= TIMESTAMP(\"2021-10-29\", \"UTC\") AND event_timestamp < TIMESTAMP(\"2021-11-22T02:01:06Z\")"
],
"usage_count": 5
},
"properties": {
"attributes": {
"dataset": "dataset",
"full_qualified_name": "gcpproject:dataset.tablename-mid",
"partition_field": "event_timestamp",
"project": "gcpproject",
"type": "TABLE"
},
"labels": {
"owner": "user_1"
}
},
"resource": {
"name": "tablename-mid",
"service": "bigquery",
"urn": "bigquery::gcpproject/dataset/tablename-mid"
}
}
}
]
}
]
]
6 changes: 6 additions & 0 deletions store/elasticsearch/testutil/elastic_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ func NewElasticsearchTestServer() *ElasticsearchTestServer {
Addresses: []string{
server.url.String(),
},
// uncomment below code to debug request and response to elasticsearch
// Logger: &estransport.ColorLogger{
// Output: os.Stdout,
// EnableRequestBody: true,
// EnableResponseBody: true,
// },
},
)
if err != nil {
Expand Down
Loading

0 comments on commit 695a179

Please sign in to comment.