From 1eb6f7800c574a91a92f122ef8e93b4bbe2874f0 Mon Sep 17 00:00:00 2001 From: Stewart Jingga Date: Mon, 27 Dec 2021 11:50:51 +0700 Subject: [PATCH 1/3] feat(records): add pagination when getting type's records --- api/handlers/record_handler.go | 33 ++- api/handlers/record_handler_test.go | 130 +++++---- discovery/config.go | 29 -- discovery/model.go | 58 ++++ discovery/repo.go | 8 +- lib/mock/mocks.go | 6 +- store/elasticsearch/es.go | 3 +- store/elasticsearch/record_repository.go | 249 +++++++++--------- store/elasticsearch/record_repository_test.go | 37 ++- store/elasticsearch/testdata/records-all.json | 130 +++++++++ store/elasticsearch/testdata/records-id.json | 20 +- .../testdata/records-offset.json | 40 +++ .../elasticsearch/testdata/records-vn-id.json | 50 ++-- store/elasticsearch/type_repository.go | 4 + store/elasticsearch/utils.go | 8 - 15 files changed, 530 insertions(+), 275 deletions(-) delete mode 100644 discovery/config.go create mode 100644 store/elasticsearch/testdata/records-all.json create mode 100644 store/elasticsearch/testdata/records-offset.json delete mode 100644 store/elasticsearch/utils.go diff --git a/api/handlers/record_handler.go b/api/handlers/record_handler.go index 3207a9e0..70d9f49d 100644 --- a/api/handlers/record_handler.go +++ b/api/handlers/record_handler.go @@ -5,6 +5,8 @@ import ( "errors" "fmt" "net/http" + "net/url" + "strconv" "strings" "github.com/gorilla/mux" @@ -140,9 +142,13 @@ func (h *RecordHandler) GetByType(w http.ResponseWriter, r *http.Request) { writeJSONError(w, status, message) return } - filterCfg := filterConfigFromValues(r.URL.Query()) + getCfg, err := h.buildGetConfig(r.URL.Query()) + if err != nil { + writeJSONError(w, http.StatusBadRequest, err.Error()) + return + } - records, err := recordRepo.GetAll(r.Context(), filterCfg) + recordList, err := recordRepo.GetAll(r.Context(), getCfg) if err != nil { h.logger.WithField("type", t.Name). Errorf("error fetching records: GetAll: %v", err) @@ -153,9 +159,9 @@ func (h *RecordHandler) GetByType(w http.ResponseWriter, r *http.Request) { fieldsToSelect := h.parseSelectQuery(r.URL.Query().Get("select")) if len(fieldsToSelect) > 0 { - records = h.selectRecordFields(fieldsToSelect, records) + recordList.Data = h.selectRecordFields(fieldsToSelect, recordList.Data) } - writeJSON(w, http.StatusOK, records) + writeJSON(w, http.StatusOK, recordList) } func (h *RecordHandler) GetOneByType(w http.ResponseWriter, r *http.Request) { @@ -198,6 +204,25 @@ func (h *RecordHandler) GetOneByType(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, record) } +func (h *RecordHandler) buildGetConfig(params url.Values) (cfg discovery.GetConfig, err error) { + if size := params.Get("size"); size != "" { + cfg.Size, err = strconv.Atoi(size) + if err != nil { + return + } + } + if from := params.Get("from"); from != "" { + cfg.From, err = strconv.Atoi(from) + if err != nil { + return + } + } + + cfg.Filters = filterConfigFromValues(params) + + return +} + func (h *RecordHandler) parseSelectQuery(raw string) (fields []string) { tokens := strings.Split(raw, ",") for _, token := range tokens { diff --git a/api/handlers/record_handler_test.go b/api/handlers/record_handler_test.go index 073eac14..bd299f85 100644 --- a/api/handlers/record_handler_test.go +++ b/api/handlers/record_handler_test.go @@ -337,13 +337,17 @@ func TestRecordHandler(t *testing.T) { Setup: func(tc *testCase, rrf *mock.RecordRepositoryFactory) {}, }, { - Description: "should return an http 200 irrespective of environment value", + Description: "should get from and size from querystring and pass it to repo", Type: typeName, - QueryStrings: "filter.data.environment=nonexisting", + QueryStrings: "from=5&size=10", ExpectStatus: http.StatusOK, Setup: func(tc *testCase, rrf *mock.RecordRepositoryFactory) { rr := new(mock.RecordRepository) - rr.On("GetAll", ctx, map[string][]string{"data.environment": {"nonexisting"}}).Return(records, nil) + rr.On("GetAll", ctx, discovery.GetConfig{ + Filters: map[string][]string{}, + From: 5, + Size: 10, + }).Return(discovery.RecordList{Data: records}, nil) rrf.On("For", typeName).Return(rr, nil) }, }, @@ -354,32 +358,63 @@ func TestRecordHandler(t *testing.T) { ExpectStatus: http.StatusOK, Setup: func(tc *testCase, rrf *mock.RecordRepositoryFactory) { rr := new(mock.RecordRepository) - rr.On("GetAll", ctx, map[string][]string{ - "service": {"kafka", "rabbitmq"}, - "data.company": {"appel"}, - }).Return(records, nil) + rr.On("GetAll", ctx, discovery.GetConfig{ + Filters: map[string][]string{ + "service": {"kafka", "rabbitmq"}, + "data.company": {"appel"}, + }}).Return(discovery.RecordList{Data: records}, nil) rrf.On("For", typeName).Return(rr, nil) }, }, { - Description: "should return all records for an type", + Description: "should return http 500 if the handler fails to construct record repository", Type: typeName, QueryStrings: "filter.data.environment=test", + ExpectStatus: http.StatusInternalServerError, + Setup: func(tc *testCase, rrf *mock.RecordRepositoryFactory) { + rr := new(mock.RecordRepository) + err := fmt.Errorf("something went wrong") + rrf.On("For", typeName).Return(rr, err) + }, + }, + { + Description: "should return an http 500 if calling recordRepository.GetAll fails", + Type: typeName, + QueryStrings: "filter.data.environment=test", + ExpectStatus: http.StatusInternalServerError, + Setup: func(tc *testCase, rrf *mock.RecordRepositoryFactory) { + rr := new(mock.RecordRepository) + err := fmt.Errorf("temporarily unavailable") + rr.On("GetAll", ctx, discovery.GetConfig{ + Filters: map[string][]string{"data.environment": {"test"}}, + }).Return(discovery.RecordList{Data: []record.Record{}}, err) + rrf.On("For", typeName).Return(rr, nil) + }, + }, + { + Description: "should return 200 on success and RecordList", + Type: typeName, ExpectStatus: http.StatusOK, Setup: func(tc *testCase, rrf *mock.RecordRepositoryFactory) { rr := new(mock.RecordRepository) - rr.On("GetAll", ctx, map[string][]string{"data.environment": {"test"}}).Return(records, nil) + rr.On("GetAll", ctx, discovery.GetConfig{ + Filters: map[string][]string{}, + }).Return(discovery.RecordList{Data: records}, nil) rrf.On("For", typeName).Return(rr, nil) }, PostCheck: func(tc *testCase, resp *http.Response) error { - var response []record.Record + var response discovery.RecordList err := json.NewDecoder(resp.Body).Decode(&response) if err != nil { return fmt.Errorf("error parsing response payload: %v", err) } - // TODO: more useful error messages - if reflect.DeepEqual(response, records) == false { - return fmt.Errorf("expected handler to return %v, returned %v instead", records, response) + + expected := discovery.RecordList{ + Data: records, + } + + if reflect.DeepEqual(response, expected) == false { + return fmt.Errorf("expected handler to return %v, returned %v instead", expected, response) } return nil }, @@ -391,63 +426,44 @@ func TestRecordHandler(t *testing.T) { ExpectStatus: http.StatusOK, Setup: func(tc *testCase, rrf *mock.RecordRepositoryFactory) { rr := new(mock.RecordRepository) - rr.On("GetAll", ctx, map[string][]string{"data.environment": {"test"}}).Return(records, nil) + rr.On("GetAll", ctx, discovery.GetConfig{ + Filters: map[string][]string{"data.environment": {"test"}}, + }).Return(discovery.RecordList{Data: records}, nil) rrf.On("For", typeName).Return(rr, nil) }, PostCheck: func(tc *testCase, resp *http.Response) error { - var expectRecords = []record.Record{ - { - Urn: "test-fh-1", - Data: map[string]interface{}{ - "urn": "test-fh-1", - "owner": "de", - }, - }, - { - Urn: "test-fh-2", - Data: map[string]interface{}{ - "urn": "test-fh-2", - "owner": "de", - }, - }, - } - - var response []record.Record + var response discovery.RecordList err := json.NewDecoder(resp.Body).Decode(&response) if err != nil { return fmt.Errorf("error parsing response payload: %v", err) } - if reflect.DeepEqual(response, expectRecords) == false { - return fmt.Errorf("expected handler to return %v, returned %v instead", expectRecords, response) + expected := discovery.RecordList{ + Data: []record.Record{ + { + Urn: "test-fh-1", + Data: map[string]interface{}{ + "urn": "test-fh-1", + "owner": "de", + }, + }, + { + Urn: "test-fh-2", + Data: map[string]interface{}{ + "urn": "test-fh-2", + "owner": "de", + }, + }, + }, + } + + if reflect.DeepEqual(response, expected) == false { + return fmt.Errorf("expected handler to return %v, returned %v instead", expected, response) } return nil }, }, - { - Description: "(internal) should return http 500 if the handler fails to construct record repository", - Type: typeName, - QueryStrings: "filter.data.environment=test", - ExpectStatus: http.StatusInternalServerError, - Setup: func(tc *testCase, rrf *mock.RecordRepositoryFactory) { - rr := new(mock.RecordRepository) - err := fmt.Errorf("something went wrong") - rrf.On("For", typeName).Return(rr, err) - }, - }, - { - Description: "(internal) should return an http 500 if calling recordRepository.GetAll fails", - Type: typeName, - QueryStrings: "filter.data.environment=test", - ExpectStatus: http.StatusInternalServerError, - Setup: func(tc *testCase, rrf *mock.RecordRepositoryFactory) { - rr := new(mock.RecordRepository) - err := fmt.Errorf("temporarily unavailable") - rr.On("GetAll", ctx, map[string][]string{"data.environment": {"test"}}).Return([]record.Record{}, err) - rrf.On("For", typeName).Return(rr, nil) - }, - }, } for _, tc := range testCases { t.Run(tc.Description, func(t *testing.T) { diff --git a/discovery/config.go b/discovery/config.go deleted file mode 100644 index a7bb538e..00000000 --- a/discovery/config.go +++ /dev/null @@ -1,29 +0,0 @@ -package discovery - -// RecordFilter is a filter intended to be used as a search -// criteria for operations involving record search -type RecordFilter = map[string][]string - -// SearchConfig represents a search query along -// with any corresponding filter(s) -type SearchConfig struct { - // Text to search for - Text string - - // Filters specifies document level values to look for. - // Multiple values can be specified for a single key - Filters RecordFilter - - // Number of relevant results to return - MaxResults int - - // List of record types to search for - // a zero value signifies that all types should be searched - TypeWhiteList []string - - // RankBy is a param to rank based on a specific parameter - RankBy string - - // Queries is a param to search a resource based on record's fields - Queries map[string]string -} diff --git a/discovery/model.go b/discovery/model.go index 1efe38de..2388efd0 100644 --- a/discovery/model.go +++ b/discovery/model.go @@ -1,5 +1,12 @@ package discovery +import "github.com/odpf/columbus/record" + +// RecordFilter is a filter intended to be used as a search +// criteria for operations involving record search +type RecordFilter = map[string][]string + +// SearchResult represents an item/result in a list of search results type SearchResult struct { ID string `json:"id"` Title string `json:"title"` @@ -8,3 +15,54 @@ type SearchResult struct { Description string `json:"description"` Labels map[string]string `json:"labels"` } + +// SearchConfig represents a search query along +// with any corresponding filter(s) +type SearchConfig struct { + // Text to search for + Text string + + // Filters specifies document level values to look for. + // Multiple values can be specified for a single key + Filters RecordFilter + + // Number of relevant results to return + MaxResults int + + // List of record types to search for + // a zero value signifies that all types should be searched + TypeWhiteList []string + + // RankBy is a param to rank based on a specific parameter + RankBy string + + // Queries is a param to search a resource based on record's fields + Queries map[string]string +} + +// GetConfig represents a get query along +// with any corresponding filter(s) +type GetConfig struct { + // Filters specifies document level values to look for. + // Multiple values can be specified for a single key + Filters RecordFilter + + // Number of relevant results to return + Size int + + // Offset to fetch records from + From int +} + +// RecordList is a struct that wraps list of records with total +type RecordList struct { + // Data contains list of fetched records + Data []record.Record `json:"data"` + + // Count is the length of Data + Count int `json:"count"` + + // Total is the total of available data in the repository + // It also includes those that are not fetched + Total int `json:"total"` +} diff --git a/discovery/repo.go b/discovery/repo.go index 6c42bc9d..c57b5f80 100644 --- a/discovery/repo.go +++ b/discovery/repo.go @@ -17,9 +17,8 @@ type RecordRepository interface { CreateOrReplaceMany(context.Context, []record.Record) error // GetAll returns specific records from storage - // RecordFilter is an optional data structure that is - // used for return documents matching the search criteria. - GetAll(context.Context, RecordFilter) ([]record.Record, error) + // GetConfig is used to configure fetching such as filters and offset + GetAll(ctx context.Context, cfg GetConfig) (RecordList, error) // GetAllIterator returns RecordIterator to iterate records by batches GetAllIterator(context.Context) (RecordIterator, error) @@ -33,9 +32,6 @@ type RecordRepository interface { // The field that contains this ID is defined by the // type to which this record belongs Delete(context.Context, string) error - - // TODO: we should probably switch to iterator types for returning - // records, or we could add options for pagination } // RecordRepositoryFactory represents a type capable diff --git a/lib/mock/mocks.go b/lib/mock/mocks.go index d0a886e1..4e64efb3 100644 --- a/lib/mock/mocks.go +++ b/lib/mock/mocks.go @@ -53,9 +53,9 @@ func (repo *RecordRepository) CreateOrReplaceMany(ctx context.Context, records [ return args.Error(0) } -func (repo *RecordRepository) GetAll(ctx context.Context, filter discovery.RecordFilter) ([]record.Record, error) { - args := repo.Called(ctx, filter) - return args.Get(0).([]record.Record), args.Error(1) +func (repo *RecordRepository) GetAll(ctx context.Context, cfg discovery.GetConfig) (discovery.RecordList, error) { + args := repo.Called(ctx, cfg) + return args.Get(0).(discovery.RecordList), args.Error(1) } func (repo *RecordRepository) GetAllIterator(ctx context.Context) (discovery.RecordIterator, error) { diff --git a/store/elasticsearch/es.go b/store/elasticsearch/es.go index 14191cc3..aeac6b1b 100644 --- a/store/elasticsearch/es.go +++ b/store/elasticsearch/es.go @@ -27,7 +27,8 @@ type searchHit struct { type searchResponse struct { ScrollID string `json:"_scroll_id"` Hits struct { - Hits []searchHit `json:"hits"` + Total elastic.TotalHits `json:"total"` + Hits []searchHit `json:"hits"` } `json:"hits"` Suggest map[string][]struct { Text string `json:"text"` diff --git a/store/elasticsearch/record_repository.go b/store/elasticsearch/record_repository.go index fb05f7fd..285e5764 100644 --- a/store/elasticsearch/record_repository.go +++ b/store/elasticsearch/record_repository.go @@ -17,9 +17,10 @@ import ( "github.com/olivere/elastic/v7" ) -type getResponse struct { - Source record.Record `json:"_source"` -} +const ( + defaultGetSize = 20 + defaultSortField = "name.keyword" +) // RecordRepository implements discovery.RecordRepository // with elasticsearch as the backing store. @@ -28,55 +29,6 @@ type RecordRepository struct { cli *elasticsearch.Client } -func (repo *RecordRepository) CreateOrReplaceMany(ctx context.Context, records []record.Record) error { - requestPayload, err := repo.createBulkInsertPayload(records) - if err != nil { - return fmt.Errorf("error serialising payload: %w", err) - } - res, err := repo.cli.Bulk( - requestPayload, - repo.cli.Bulk.WithRefresh("true"), - repo.cli.Bulk.WithContext(ctx), - ) - if err != nil { - return elasticSearchError(err) - } - defer res.Body.Close() - if res.IsError() { - return fmt.Errorf("error response from elasticsearch: %s", errorReasonFromResponse(res)) - } - return nil -} - -func (repo *RecordRepository) createBulkInsertPayload(records []record.Record) (io.Reader, error) { - payload := bytes.NewBuffer(nil) - for _, record := range records { - err := repo.writeInsertAction(payload, record) - if err != nil { - return nil, fmt.Errorf("createBulkInsertPayload: %w", err) - } - err = json.NewEncoder(payload).Encode(record) - if err != nil { - return nil, fmt.Errorf("error serialising record: %w", err) - } - } - return payload, nil -} - -func (repo *RecordRepository) writeInsertAction(w io.Writer, record record.Record) error { - if strings.TrimSpace(record.Urn) == "" { - return fmt.Errorf("URN record field cannot be empty") - } - type obj map[string]interface{} - action := obj{ - "index": obj{ - "_index": repo.typeName, - "_id": record.Urn, - }, - } - return json.NewEncoder(w).Encode(action) -} - func (repo *RecordRepository) GetAllIterator(ctx context.Context) (discovery.RecordIterator, error) { body, err := repo.getAllQuery(discovery.RecordFilter{}) if err != nil { @@ -112,48 +64,152 @@ func (repo *RecordRepository) GetAllIterator(ctx context.Context) (discovery.Rec return &it, nil } -func (repo *RecordRepository) GetAll(ctx context.Context, filters discovery.RecordFilter) ([]record.Record, error) { +func (repo *RecordRepository) GetAll(ctx context.Context, cfg discovery.GetConfig) (recordList discovery.RecordList, err error) { // XXX(Aman): we should probably think about result ordering, if the client // is going to slice the data for pagination. Does ES guarantee the result order? - body, err := repo.getAllQuery(filters) + body, err := repo.getAllQuery(cfg.Filters) if err != nil { - return nil, fmt.Errorf("error building search query: %w", err) + err = fmt.Errorf("error building search query: %w", err) + return + } + size := cfg.Size + if size == 0 { + size = defaultGetSize } resp, err := repo.cli.Search( repo.cli.Search.WithIndex(repo.typeName), repo.cli.Search.WithBody(body), - repo.cli.Search.WithScroll(defaultScrollTimeout), - repo.cli.Search.WithSize(defaultScrollBatchSize), + repo.cli.Search.WithFrom(cfg.From), + repo.cli.Search.WithSize(size), + repo.cli.Search.WithSort(defaultSortField), repo.cli.Search.WithContext(ctx), ) if err != nil { - return nil, fmt.Errorf("error executing search: %w", err) + err = fmt.Errorf("error executing search: %w", err) + return } defer resp.Body.Close() if resp.IsError() { - return nil, fmt.Errorf("error response from elasticsearch: %s", errorReasonFromResponse(resp)) + err = fmt.Errorf("error response from elasticsearch: %s", errorReasonFromResponse(resp)) + return } var response searchResponse err = json.NewDecoder(resp.Body).Decode(&response) if err != nil { - return nil, fmt.Errorf("error decoding es response: %w", err) + err = fmt.Errorf("error decoding es response: %w", err) + return } - var results = repo.toRecordList(response) - var scrollID = response.ScrollID - for { - var nextResults []record.Record - nextResults, scrollID, err = repo.scrollRecords(ctx, scrollID) + var records = repo.toRecordList(response) + + recordList.Data = records + recordList.Count = len(records) + recordList.Total = int(response.Hits.Total.Value) + + return +} + +func (repo *RecordRepository) CreateOrReplaceMany(ctx context.Context, records []record.Record) error { + requestPayload, err := repo.createBulkInsertPayload(records) + if err != nil { + return fmt.Errorf("error serialising payload: %w", err) + } + res, err := repo.cli.Bulk( + requestPayload, + repo.cli.Bulk.WithRefresh("true"), + repo.cli.Bulk.WithContext(ctx), + ) + if err != nil { + return elasticSearchError(err) + } + defer res.Body.Close() + if res.IsError() { + return fmt.Errorf("error response from elasticsearch: %s", errorReasonFromResponse(res)) + } + return nil +} + +func (repo *RecordRepository) GetByID(ctx context.Context, id string) (r record.Record, err error) { + res, err := repo.cli.Get( + repo.typeName, + url.PathEscape(id), + repo.cli.Get.WithContext(ctx), + ) + if err != nil { + err = fmt.Errorf("error executing get: %w", err) + return + } + defer res.Body.Close() + + if res.IsError() { + if res.StatusCode == http.StatusNotFound { + err = record.ErrNoSuchRecord{RecordID: id} + return + } + err = fmt.Errorf("got %s response from elasticsearch: %s", res.Status(), res) + return + } + + var response searchHit + err = json.NewDecoder(res.Body).Decode(&response) + if err != nil { + err = fmt.Errorf("error parsing response: %w", err) + return + } + + r = response.Source + return +} + +func (repo *RecordRepository) Delete(ctx context.Context, id string) error { + res, err := repo.cli.Delete( + repo.typeName, + url.PathEscape(id), + repo.cli.Delete.WithRefresh("true"), + repo.cli.Delete.WithContext(ctx), + ) + if err != nil { + return fmt.Errorf("error deleting record: %w", err) + } + defer res.Body.Close() + if res.IsError() { + if res.StatusCode == http.StatusNotFound { + return record.ErrNoSuchRecord{RecordID: id} + } + return fmt.Errorf("error response from elasticsearch: %s", errorReasonFromResponse(res)) + } + + return nil +} + +func (repo *RecordRepository) createBulkInsertPayload(records []record.Record) (io.Reader, error) { + payload := bytes.NewBuffer(nil) + for _, record := range records { + err := repo.writeInsertAction(payload, record) if err != nil { - return nil, fmt.Errorf("error scrolling results: %v", err) + return nil, fmt.Errorf("createBulkInsertPayload: %w", err) } - if len(nextResults) == 0 { - break + err = json.NewEncoder(payload).Encode(record) + if err != nil { + return nil, fmt.Errorf("error serialising record: %w", err) } - results = append(results, nextResults...) } - return results, nil + return payload, nil +} + +func (repo *RecordRepository) writeInsertAction(w io.Writer, record record.Record) error { + if strings.TrimSpace(record.Urn) == "" { + return fmt.Errorf("URN record field cannot be empty") + } + type obj map[string]interface{} + action := obj{ + "index": obj{ + "_index": repo.typeName, + "_id": record.Urn, + }, + } + return json.NewEncoder(w).Encode(action) } func (repo *RecordRepository) scrollRecords(ctx context.Context, scrollID string) ([]record.Record, string, error) { @@ -223,59 +279,6 @@ func (repo *RecordRepository) termsQuery(filters discovery.RecordFilter) (io.Rea return payload, json.NewEncoder(payload).Encode(raw) } -func (repo *RecordRepository) GetByID(ctx context.Context, id string) (r record.Record, err error) { - res, err := repo.cli.Get( - repo.typeName, - url.PathEscape(id), - repo.cli.Get.WithContext(ctx), - ) - if err != nil { - err = fmt.Errorf("error executing get: %w", err) - return - } - defer res.Body.Close() - - if res.IsError() { - if res.StatusCode == http.StatusNotFound { - err = record.ErrNoSuchRecord{RecordID: id} - return - } - err = fmt.Errorf("got %s response from elasticsearch: %s", res.Status(), res) - return - } - - var response getResponse - err = json.NewDecoder(res.Body).Decode(&response) - if err != nil { - err = fmt.Errorf("error parsing response: %w", err) - return - } - - r = response.Source - return -} - -func (repo *RecordRepository) Delete(ctx context.Context, id string) error { - res, err := repo.cli.Delete( - repo.typeName, - url.PathEscape(id), - repo.cli.Delete.WithRefresh("true"), - repo.cli.Delete.WithContext(ctx), - ) - if err != nil { - return fmt.Errorf("error deleting record: %w", err) - } - defer res.Body.Close() - if res.IsError() { - if res.StatusCode == http.StatusNotFound { - return record.ErrNoSuchRecord{RecordID: id} - } - return fmt.Errorf("error response from elasticsearch: %s", errorReasonFromResponse(res)) - } - - return nil -} - // recordIterator is the internal implementation of record.RecordIterator by RecordRepository type recordIterator struct { resp *esapi.Response diff --git a/store/elasticsearch/record_repository_test.go b/store/elasticsearch/record_repository_test.go index 4914357d..c8db4d58 100644 --- a/store/elasticsearch/record_repository_test.go +++ b/store/elasticsearch/record_repository_test.go @@ -159,16 +159,27 @@ func TestRecordRepository(t *testing.T) { }) t.Run("GetAll", func(t *testing.T) { type testCase struct { - Description string - Filter discovery.RecordFilter - ResultsFile string + Description string + Filter discovery.RecordFilter + ResultsFile string + From int + Size int + ExpectedTotal int } var testCases = []testCase{ { - Description: "should handle nil filter", + Description: "should handle nil filter and default sort by name", Filter: nil, - ResultsFile: "./testdata/records.json", + ResultsFile: "./testdata/records-all.json", + }, + { + Description: "should fetch certain offset and size if given", + Filter: nil, + From: 2, + Size: 3, + ResultsFile: "./testdata/records-offset.json", + ExpectedTotal: 10, }, { Description: "should handle filter by service", @@ -215,17 +226,25 @@ func TestRecordRepository(t *testing.T) { return } - actualResults, err := recordRepo.GetAll(ctx, tc.Filter) + recordList, err := recordRepo.GetAll(ctx, discovery.GetConfig{ + Filters: tc.Filter, + From: tc.From, + Size: tc.Size, + }) if err != nil { t.Fatalf("error executing GetAll: %v", err) return } - assert.Equal(t, len(expectedResults), len(actualResults)) - if reflect.DeepEqual(expectedResults, actualResults) == false { - t.Error(incorrectResultsError(expectedResults, actualResults)) + assert.Equal(t, len(expectedResults), recordList.Count) + if reflect.DeepEqual(expectedResults, recordList.Data) == false { + t.Error(incorrectResultsError(expectedResults, recordList.Data)) return } + + if tc.ExpectedTotal > 0 { + assert.Equal(t, tc.ExpectedTotal, recordList.Total) + } }) } }) diff --git a/store/elasticsearch/testdata/records-all.json b/store/elasticsearch/testdata/records-all.json new file mode 100644 index 00000000..d5f524ec --- /dev/null +++ b/store/elasticsearch/testdata/records-all.json @@ -0,0 +1,130 @@ +[ + { + "urn": "c-demo-kafka", + "name": "demo-kafka", + "service": "kafka", + "description": "", + "data": { + "urn": "c-demo-kafka", + "title": "demo-kafka", + "description": "", + "entity": "odpf", + "country": "id" + } + }, + { + "urn": "i-undefined-dfgdgd-avi", + "name": "dfgdgd-avi", + "service": "rabbitmq", + "description": "", + "data": { + "urn": "i-undefined-dfgdgd-avi", + "title": "dfgdgd-avi", + "description": "", + "entity": "odpf", + "country": "id" + } + }, + { + "urn": "g-jane-kafka-1a", + "name": "jane-kafka-1a", + "description": "", + "data": { + "urn": "g-jane-kafka-1a", + "title": "jane-kafka-1a", + "description": "", + "entity": "odpf", + "country": "vn" + } + }, + { + "urn": "f-john-test-001", + "name": "john_test_001", + "service": "kafka", + "description": "", + "data": { + "urn": "f-john-test-001", + "title": "john_test_001", + "description": "", + "entity": "odpf", + "country": "id" + } + }, + { + "urn": "a-kafka-001", + "name": "kafka-001", + "service": "kafka", + "description": "", + "data": { + "urn": "a-kafka-001", + "title": "kafka-001", + "description": "", + "entity": "odpf", + "country": "vn" + } + }, + { + "urn": "d-test-abcsasa", + "name": "test-abcsasa", + "service": "kafka", + "description": "", + "data": { + "urn": "d-test-abcsasa", + "title": "test-abcsasa", + "description": "", + "entity": "odpf", + "country": "vn" + } + }, + { + "urn": "b-test-kafka", + "name": "test-kafka", + "service": "kafka", + "description": "", + "data": { + "urn": "b-test-kafka", + "title": "test-kafka", + "description": "", + "entity": "odpf", + "country": "th" + } + }, + { + "urn": "h-test-new-kafka-1-a-b-kafka", + "name": "test-new-kafka-1-a-b", + "description": "", + "data": { + "urn": "h-test-new-kafka-1-a-b-kafka", + "title": "test-new-kafka-1-a-b", + "description": "", + "entity": "odpf", + "country": "th" + } + }, + { + "urn": "e-test-grant2", + "name": "test_grant2", + "service": "kafka", + "description": "", + "data": { + "urn": "e-test-grant2", + "title": "test_grant2", + "description": "", + "entity": "odpf", + "country": "th" + } + }, + { + "urn": "j-xcvcx", + "name": "xcvcx", + "service": "rabbitmq", + "description": "", + "data": { + "urn": "j-xcvcx", + "title": "xcvcx", + "description": "", + "entity": "odpf", + "country": "vn" + } + } +] \ No newline at end of file diff --git a/store/elasticsearch/testdata/records-id.json b/store/elasticsearch/testdata/records-id.json index 7c4e7ebd..b25bdda9 100644 --- a/store/elasticsearch/testdata/records-id.json +++ b/store/elasticsearch/testdata/records-id.json @@ -13,26 +13,26 @@ } }, { - "urn": "f-john-test-001", - "name": "john_test_001", - "service": "kafka", + "urn": "i-undefined-dfgdgd-avi", + "name": "dfgdgd-avi", + "service": "rabbitmq", "description": "", "data": { - "urn": "f-john-test-001", - "title": "john_test_001", + "urn": "i-undefined-dfgdgd-avi", + "title": "dfgdgd-avi", "description": "", "entity": "odpf", "country": "id" } }, { - "urn": "i-undefined-dfgdgd-avi", - "name": "dfgdgd-avi", - "service": "rabbitmq", + "urn": "f-john-test-001", + "name": "john_test_001", + "service": "kafka", "description": "", "data": { - "urn": "i-undefined-dfgdgd-avi", - "title": "dfgdgd-avi", + "urn": "f-john-test-001", + "title": "john_test_001", "description": "", "entity": "odpf", "country": "id" diff --git a/store/elasticsearch/testdata/records-offset.json b/store/elasticsearch/testdata/records-offset.json new file mode 100644 index 00000000..3ae57fd5 --- /dev/null +++ b/store/elasticsearch/testdata/records-offset.json @@ -0,0 +1,40 @@ +[ + { + "urn": "g-jane-kafka-1a", + "name": "jane-kafka-1a", + "description": "", + "data": { + "urn": "g-jane-kafka-1a", + "title": "jane-kafka-1a", + "description": "", + "entity": "odpf", + "country": "vn" + } + }, + { + "urn": "f-john-test-001", + "name": "john_test_001", + "service": "kafka", + "description": "", + "data": { + "urn": "f-john-test-001", + "title": "john_test_001", + "description": "", + "entity": "odpf", + "country": "id" + } + }, + { + "urn": "a-kafka-001", + "name": "kafka-001", + "service": "kafka", + "description": "", + "data": { + "urn": "a-kafka-001", + "title": "kafka-001", + "description": "", + "entity": "odpf", + "country": "vn" + } + } +] diff --git a/store/elasticsearch/testdata/records-vn-id.json b/store/elasticsearch/testdata/records-vn-id.json index d9f19000..180ae575 100644 --- a/store/elasticsearch/testdata/records-vn-id.json +++ b/store/elasticsearch/testdata/records-vn-id.json @@ -1,38 +1,37 @@ [ { - "urn": "a-kafka-001", - "name": "kafka-001", + "urn": "c-demo-kafka", + "name": "demo-kafka", "service": "kafka", "description": "", "data": { - "urn": "a-kafka-001", - "title": "kafka-001", + "urn": "c-demo-kafka", + "title": "demo-kafka", "description": "", "entity": "odpf", - "country": "vn" + "country": "id" } }, { - "urn": "c-demo-kafka", - "name": "demo-kafka", - "service": "kafka", + "urn": "i-undefined-dfgdgd-avi", + "name": "dfgdgd-avi", + "service": "rabbitmq", "description": "", "data": { - "urn": "c-demo-kafka", - "title": "demo-kafka", + "urn": "i-undefined-dfgdgd-avi", + "title": "dfgdgd-avi", "description": "", "entity": "odpf", "country": "id" } }, { - "urn": "d-test-abcsasa", - "name": "test-abcsasa", - "service": "kafka", + "urn": "g-jane-kafka-1a", + "name": "jane-kafka-1a", "description": "", "data": { - "urn": "d-test-abcsasa", - "title": "test-abcsasa", + "urn": "g-jane-kafka-1a", + "title": "jane-kafka-1a", "description": "", "entity": "odpf", "country": "vn" @@ -52,28 +51,29 @@ } }, { - "urn": "g-jane-kafka-1a", - "name": "jane-kafka-1a", + "urn": "a-kafka-001", + "name": "kafka-001", + "service": "kafka", "description": "", "data": { - "urn": "g-jane-kafka-1a", - "title": "jane-kafka-1a", + "urn": "a-kafka-001", + "title": "kafka-001", "description": "", "entity": "odpf", "country": "vn" } }, { - "urn": "i-undefined-dfgdgd-avi", - "name": "dfgdgd-avi", - "service": "rabbitmq", + "urn": "d-test-abcsasa", + "name": "test-abcsasa", + "service": "kafka", "description": "", "data": { - "urn": "i-undefined-dfgdgd-avi", - "title": "dfgdgd-avi", + "urn": "d-test-abcsasa", + "title": "test-abcsasa", "description": "", "entity": "odpf", - "country": "id" + "country": "vn" } }, { diff --git a/store/elasticsearch/type_repository.go b/store/elasticsearch/type_repository.go index d2680f4c..1a34a41d 100644 --- a/store/elasticsearch/type_repository.go +++ b/store/elasticsearch/type_repository.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "strings" + "time" "github.com/elastic/go-elasticsearch/v7" "github.com/odpf/columbus/record" @@ -20,6 +21,9 @@ const ( // name of the search index defaultSearchIndex = "universe" + + defaultScrollTimeout = 30 * time.Second + defaultScrollBatchSize = 20 ) func isReservedName(name string) bool { diff --git a/store/elasticsearch/utils.go b/store/elasticsearch/utils.go deleted file mode 100644 index 366ef76f..00000000 --- a/store/elasticsearch/utils.go +++ /dev/null @@ -1,8 +0,0 @@ -package elasticsearch - -import "time" - -const ( - defaultScrollTimeout = 30 * time.Second - defaultScrollBatchSize = 1000 -) From db69ca52283296a77c25dfe307b835a6fc88f00d Mon Sep 17 00:00:00 2001 From: Stewart Jingga Date: Mon, 27 Dec 2021 11:51:05 +0700 Subject: [PATCH 2/3] fix(tags): inconsistent error --- tag/validator/error_test.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/tag/validator/error_test.go b/tag/validator/error_test.go index f6c183a9..5934b995 100644 --- a/tag/validator/error_test.go +++ b/tag/validator/error_test.go @@ -10,15 +10,10 @@ func TestFieldErrorAsString(t *testing.T) { key1 := "key1 error" message1 := "message1 error" - key2 := "key2 error" - message2 := "message2 error" - err := FieldError{ key1: message1, - key2: message2, } - - expectedString := "error with [key1 error : message1 error, key2 error : message2 error]" + expectedString := "error with [key1 error : message1 error]" assert.Equal(t, expectedString, err.Error()) } From eb56f89b5ac777c887632ddd30eb7bfc0f4d9064 Mon Sep 17 00:00:00 2001 From: Stewart Jingga Date: Mon, 3 Jan 2022 15:26:22 +0700 Subject: [PATCH 3/3] feat: add cors middleware --- cmd/serve.go | 5 ++++- go.mod | 1 + go.sum | 2 ++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/cmd/serve.go b/cmd/serve.go index 1275e8bc..87ba5ff3 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -21,6 +21,7 @@ import ( esStore "github.com/odpf/columbus/store/elasticsearch" "github.com/odpf/columbus/store/postgres" "github.com/odpf/columbus/tag" + "github.com/rs/cors" "github.com/sirupsen/logrus" "gorm.io/gorm" ) @@ -43,10 +44,12 @@ func Serve() { newRelicMonitor := initNewRelicMonitor(config) statsdMonitor := initStatsdMonitor(config) router := initRouter(esClient, newRelicMonitor, statsdMonitor, rootLogger) + c := cors.Default() + handler := c.Handler(router) serverAddr := fmt.Sprintf("%s:%s", config.ServerHost, config.ServerPort) log.Printf("starting http server on %s", serverAddr) - if err := http.ListenAndServe(serverAddr, router); err != nil { + if err := http.ListenAndServe(serverAddr, handler); err != nil { log.Errorf("listen and serve: %v", err) } } diff --git a/go.mod b/go.mod index 4348de55..51ebc02b 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/odpf/salt v0.0.0-20211028100023-de463ef825e1 github.com/olivere/elastic/v7 v7.0.12 github.com/pkg/errors v0.9.1 + github.com/rs/cors v1.8.2 github.com/sirupsen/logrus v1.8.1 github.com/spf13/cobra v1.2.1 github.com/spf13/viper v1.8.1 diff --git a/go.sum b/go.sum index 6fae0958..6f8e61d5 100644 --- a/go.sum +++ b/go.sum @@ -419,6 +419,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= +github.com/rs/cors v1.8.2 h1:KCooALfAYGs415Cwu5ABvv9n9509fSiG5SQJn/AQo4U= +github.com/rs/cors v1.8.2/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc=