diff --git a/Makefile b/Makefile index f12dda0d..f441dd38 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ COVERFILE="/tmp/columbus.coverprofile" all: build build: - go build -ldflags "-X main.Version=${VERSION}" ${NAME} + go build -ldflags "-X cmd.Version=${VERSION}" ${NAME} clean: rm -rf columbus dist/ diff --git a/api/routes.go b/api/routes.go index edae9f2f..6c095d53 100644 --- a/api/routes.go +++ b/api/routes.go @@ -29,6 +29,11 @@ func RegisterRoutes(router *mux.Router, config Config) { config.TypeRepository, ) + lineageHandler := handlers.NewLineageHandler( + config.Logger.WithField("reporter", "lineage-handler"), + config.LineageProvider, + ) + router.PathPrefix("/ping").Handler(handlers.NewHeartbeatHandler()) setupTypeRoutes(router, "/v1/types", typeHandler) @@ -36,18 +41,13 @@ func RegisterRoutes(router *mux.Router, config Config) { Methods(http.MethodGet). HandlerFunc(searchHandler.Search) - // Temporarily disable lineage routes - // lineageHandler := handlers.NewLineageHandler( - // config.Logger.WithField("reporter", "lineage-handler"), - // config.LineageProvider, - // ) - // router.PathPrefix("/v1/lineage/{type}/{id}"). - // Methods(http.MethodGet). - // HandlerFunc(lineageHandler.GetLineage) - - // router.PathPrefix("/v1/lineage"). - // Methods(http.MethodGet). - // HandlerFunc(lineageHandler.ListLineage) + router.PathPrefix("/v1/lineage/{type}/{id}"). + Methods(http.MethodGet). + HandlerFunc(lineageHandler.GetLineage) + + router.PathPrefix("/v1/lineage"). + Methods(http.MethodGet). + HandlerFunc(lineageHandler.ListLineage) } func setupTypeRoutes(router *mux.Router, baseURL string, typeHandler *handlers.TypeHandler) { diff --git a/cmd/serve.go b/cmd/serve.go index 34ddd1f0..d1413536 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -64,6 +64,7 @@ func initRouter( if err != nil { log.Fatalf("error creating searcher: %v", err) } + lineageService, err := lineage.NewService(typeRepository, recordRepositoryFactory, lineage.Config{ RefreshInterval: config.LineageRefreshIntervalStr, MetricsMonitor: statsdMonitor, @@ -72,6 +73,7 @@ func initRouter( if err != nil { log.Fatal(err) } + rootLogger.Info("lineage build complete") router := mux.NewRouter() if nrMonitor != nil { diff --git a/lib/mock/mocks.go b/lib/mock/mocks.go index 58bfb798..76fc909d 100644 --- a/lib/mock/mocks.go +++ b/lib/mock/mocks.go @@ -57,6 +57,11 @@ func (repo *RecordRepository) GetAll(ctx context.Context, filter models.RecordFi return args.Get(0).([]models.Record), args.Error(1) } +func (repo *RecordRepository) GetAllIterator(ctx context.Context) (models.RecordIterator, error) { + args := repo.Called(ctx) + return args.Get(0).(models.RecordIterator), args.Error(1) +} + func (repo *RecordRepository) GetByID(ctx context.Context, id string) (models.Record, error) { args := repo.Called(ctx, id) return args.Get(0).(models.Record), args.Error(1) @@ -67,6 +72,25 @@ func (repo *RecordRepository) Delete(ctx context.Context, id string) error { return args.Error(0) } +type RecordIterator struct { + mock.Mock +} + +func (m *RecordIterator) Scan() bool { + args := m.Called() + return args.Bool(0) +} + +func (m *RecordIterator) Next() []models.Record { + args := m.Called() + return args.Get(0).([]models.Record) +} + +func (m *RecordIterator) Close() error { + args := m.Called() + return args.Error(0) +} + type RecordSearcher struct { mock.Mock } diff --git a/lineage/builder.go b/lineage/builder.go index 0eabdb04..17f7f756 100644 --- a/lineage/builder.go +++ b/lineage/builder.go @@ -105,13 +105,17 @@ func (builder defaultBuilder) populateTypeRecords(ctx context.Context, graph Adj if err != nil { return fmt.Errorf("error obtaing record repository: %w", err) } - records, err := recordRepository.GetAll(ctx, models.RecordFilter{}) + + recordIter, err := recordRepository.GetAllIterator(ctx) if err != nil { - return fmt.Errorf("error reading records for type %q: %w", typ.Name, err) + return fmt.Errorf("error getting record iterator: %w", err) } - for _, record := range records { - if err := builder.addRecord(graph, typ, record, lineageProc); err != nil { - return fmt.Errorf("error adding record to graph: %w", err) + defer recordIter.Close() + for recordIter.Scan() { + for _, record := range recordIter.Next() { + if err := builder.addRecord(graph, typ, record, lineageProc); err != nil { + return fmt.Errorf("error adding record to graph: %w", err) + } } } return nil diff --git a/lineage/builder_test.go b/lineage/builder_test.go index 48a07654..eed384d4 100644 --- a/lineage/builder_test.go +++ b/lineage/builder_test.go @@ -29,8 +29,13 @@ func initialiseRepos(datasets []dataset) (models.TypeRepository, models.RecordRe for _, dataset := range datasets { typ := dataset.Type.Normalise() tr.On("GetByName", typ.Name).Return(typ, nil) + recordIterator := new(mock.RecordIterator) + recordIterator.On("Scan").Return(true).Once() + recordIterator.On("Scan").Return(false).Once() + recordIterator.On("Next").Return(dataset.Records) + recordIterator.On("Close").Return(nil) recordRepo := new(mock.RecordRepository) - recordRepo.On("GetAll", ctx, models.RecordFilter{}).Return(dataset.Records, nil) + recordRepo.On("GetAllIterator", ctx).Return(recordIterator, nil) rrf.On("For", typ).Return(recordRepo, nil) typList = append(typList, typ) } diff --git a/lineage/service.go b/lineage/service.go index 16d34e76..64e42248 100644 --- a/lineage/service.go +++ b/lineage/service.go @@ -3,6 +3,7 @@ package lineage import ( "context" "fmt" + "reflect" "sync" "sync/atomic" "time" @@ -53,7 +54,6 @@ func (srv *Service) build() { graph, err := srv.builder.Build(ctx, srv.typeRepo, srv.recordRepoFactory) now := srv.timeSource.Now() srv.metricsMonitor.Duration("lineageBuildTime", int(now.Sub(startTime)/time.Millisecond)) - srv.mu.Lock() defer srv.mu.Unlock() @@ -104,11 +104,7 @@ func NewService(er models.TypeRepository, rrf models.RecordRepositoryFactory, co return nil, err } - // TODO: Find a solution to solve memory issue - - // Temporarily disable building lineage on service creation. - // Columbus's memory keeps spiking when app is starting - // srv.build() + srv.build() return srv, nil } @@ -124,11 +120,11 @@ func applyConfig(service *Service, config Config) error { } service.refreshInterval = lineageRefreshInterval - if config.MetricsMonitor != nil { + if !isNilMonitor(config.MetricsMonitor) { service.metricsMonitor = config.MetricsMonitor } - if config.PerformanceMonitor != nil { + if !isNilMonitor(config.PerformanceMonitor) { service.performanceMonitor = config.PerformanceMonitor } @@ -142,3 +138,8 @@ func applyConfig(service *Service, config Config) error { return nil } + +func isNilMonitor(monitor interface{}) bool { + v := reflect.ValueOf(monitor) + return !v.IsValid() || reflect.DeepEqual(v.Interface(), reflect.Zero(v.Type()).Interface()) +} diff --git a/models/record.go b/models/record.go index 75cd69d2..140fcf67 100644 --- a/models/record.go +++ b/models/record.go @@ -9,6 +9,12 @@ import ( // TODO(Aman): add validation for mandatory fields? (landscape for instance) type Record = map[string]interface{} +type RecordIterator interface { + Scan() bool + Next() []Record + Close() error +} + // RecordFilter is a filter intended to be used as a search // criteria for operations involving record search type RecordFilter = map[string][]string @@ -22,6 +28,9 @@ type RecordRepository interface { // used for return documents matching the search criteria. GetAll(context.Context, RecordFilter) ([]Record, error) + // GetAllIterator returns RecordIterator to iterate records by batches + GetAllIterator(context.Context) (RecordIterator, error) + // GetByID returns a record by it's id. // The field that contains this ID is defined by the // type to which this record belongs diff --git a/store/record_repository.go b/store/record_repository.go index 2b10549e..fb1b13f9 100644 --- a/store/record_repository.go +++ b/store/record_repository.go @@ -11,6 +11,7 @@ import ( "time" "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/odpf/columbus/models" "github.com/olivere/elastic/v7" ) @@ -92,6 +93,41 @@ func (repo *RecordRepository) writeInsertAction(w io.Writer, record models.Recor return json.NewEncoder(w).Encode(action) } +func (repo *RecordRepository) GetAllIterator(ctx context.Context) (models.RecordIterator, error) { + body, err := repo.getAllQuery(models.RecordFilter{}) + if err != nil { + return nil, fmt.Errorf("error building search query: %w", err) + } + + resp, err := repo.cli.Search( + repo.cli.Search.WithIndex(repo.recordType.Name), + repo.cli.Search.WithBody(body), + repo.cli.Search.WithScroll(defaultScrollTimeout), + repo.cli.Search.WithSize(defaultScrollBatchSize), + repo.cli.Search.WithContext(ctx), + ) + if err != nil { + return nil, fmt.Errorf("error executing search: %w", err) + } + if resp.IsError() { + return nil, fmt.Errorf("error response from elasticsearch: %s", errorReasonFromResponse(resp)) + } + + var response searchResponse + err = json.NewDecoder(resp.Body).Decode(&response) + if err != nil { + return nil, fmt.Errorf("error decoding es response: %w", err) + } + var results = repo.toRecordList(response) + it := recordIterator{ + resp: resp, + records: results, + scrollID: response.ScrollID, + repo: repo, + } + return &it, nil +} + func (repo *RecordRepository) GetAll(ctx context.Context, filters models.RecordFilter) ([]models.Record, error) { // XXX(Aman): we should probably think about result ordering, if the client // is going to slice the data for pagination. Does ES guarantee the result order? @@ -246,6 +282,35 @@ func (repo *RecordRepository) Delete(ctx context.Context, id string) error { return nil } +// recordIterator is the internal implementation of models.RecordIterator by RecordRepository +type recordIterator struct { + resp *esapi.Response + records []models.Record + repo *RecordRepository + scrollID string +} + +func (it *recordIterator) Scan() bool { + return len(strings.TrimSpace(it.scrollID)) > 0 +} + +func (it *recordIterator) Next() (prev []models.Record) { + prev = it.records + var err error + it.records, it.scrollID, err = it.repo.scrollRecords(context.Background(), it.scrollID) + if err != nil { + panic("error scrolling results:" + err.Error()) + } + if len(it.records) == 0 { + it.scrollID = "" + } + return +} + +func (it *recordIterator) Close() error { + return it.resp.Body.Close() +} + // RecordRepositoryFactory can be used to construct a RecordRepository // for a certain type type RecordRepositoryFactory struct { diff --git a/store/record_repository_test.go b/store/record_repository_test.go index 6c2623fc..3ad7f985 100644 --- a/store/record_repository_test.go +++ b/store/record_repository_test.go @@ -163,6 +163,44 @@ func TestRecordRepository(t *testing.T) { return } + t.Run("GetAllIterator", func(t *testing.T) { + type testCase struct { + Description string + Filter models.RecordFilter + ResultsFile string + } + + t.Run("should return record iterator to iterate records", func(t *testing.T) { + expectedResults := []models.Record{} + raw, err := ioutil.ReadFile("./testdata/dagger.json") + if err != nil { + t.Fatalf("error reading results file: %v", err) + return + } + err = json.Unmarshal(raw, &expectedResults) + if err != nil { + t.Fatalf("error parsing results file: %v", err) + return + } + + var actualResults []models.Record + iterator, err := recordRepo.GetAllIterator(ctx) + if err != nil { + t.Fatalf("error executing GetAllIterator: %v", err) + return + } + for iterator.Scan() { + actualResults = append(actualResults, iterator.Next()...) + } + iterator.Close() + + if reflect.DeepEqual(expectedResults, actualResults) == false { + t.Error(incorrectResultsError(expectedResults, actualResults)) + return + } + }) + }) + t.Run("GetAll", func(t *testing.T) { type testCase struct { Description string