Skip to content

Commit

Permalink
fix(lineage): lineage building memory spikes (#16)
Browse files Browse the repository at this point in the history
* spike: optimise memory use

* fix(lineage): monitors keep getting replaced by nil values

* fix(lineage): a broken test due to new way to fetch records

* refactor: add test for RecordRepository.GetAllIterator

* refactor: minor refactoring

Co-authored-by: Saravjeet 'Aman' Singh <saravjeetamansingh@gmail.com>
  • Loading branch information
StewartJingga and turtleDev authored Oct 4, 2021
1 parent a09f925 commit 38ea109
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ COVERFILE="/tmp/columbus.coverprofile"
all: build

build:
go build -ldflags "-X main.Version=${VERSION}" ${NAME}
go build -ldflags "-X cmd.Version=${VERSION}" ${NAME}

clean:
rm -rf columbus dist/
Expand Down
24 changes: 12 additions & 12 deletions api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,25 @@ func RegisterRoutes(router *mux.Router, config Config) {
config.TypeRepository,
)

lineageHandler := handlers.NewLineageHandler(
config.Logger.WithField("reporter", "lineage-handler"),
config.LineageProvider,
)

router.PathPrefix("/ping").Handler(handlers.NewHeartbeatHandler())
setupTypeRoutes(router, "/v1/types", typeHandler)

router.Path("/v1/search").
Methods(http.MethodGet).
HandlerFunc(searchHandler.Search)

// Temporarily disable lineage routes
// lineageHandler := handlers.NewLineageHandler(
// config.Logger.WithField("reporter", "lineage-handler"),
// config.LineageProvider,
// )
// router.PathPrefix("/v1/lineage/{type}/{id}").
// Methods(http.MethodGet).
// HandlerFunc(lineageHandler.GetLineage)

// router.PathPrefix("/v1/lineage").
// Methods(http.MethodGet).
// HandlerFunc(lineageHandler.ListLineage)
router.PathPrefix("/v1/lineage/{type}/{id}").
Methods(http.MethodGet).
HandlerFunc(lineageHandler.GetLineage)

router.PathPrefix("/v1/lineage").
Methods(http.MethodGet).
HandlerFunc(lineageHandler.ListLineage)
}

func setupTypeRoutes(router *mux.Router, baseURL string, typeHandler *handlers.TypeHandler) {
Expand Down
2 changes: 2 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func initRouter(
if err != nil {
log.Fatalf("error creating searcher: %v", err)
}

lineageService, err := lineage.NewService(typeRepository, recordRepositoryFactory, lineage.Config{
RefreshInterval: config.LineageRefreshIntervalStr,
MetricsMonitor: statsdMonitor,
Expand All @@ -72,6 +73,7 @@ func initRouter(
if err != nil {
log.Fatal(err)
}
rootLogger.Info("lineage build complete")

router := mux.NewRouter()
if nrMonitor != nil {
Expand Down
24 changes: 24 additions & 0 deletions lib/mock/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ func (repo *RecordRepository) GetAll(ctx context.Context, filter models.RecordFi
return args.Get(0).([]models.Record), args.Error(1)
}

func (repo *RecordRepository) GetAllIterator(ctx context.Context) (models.RecordIterator, error) {
args := repo.Called(ctx)
return args.Get(0).(models.RecordIterator), args.Error(1)
}

func (repo *RecordRepository) GetByID(ctx context.Context, id string) (models.Record, error) {
args := repo.Called(ctx, id)
return args.Get(0).(models.Record), args.Error(1)
Expand All @@ -67,6 +72,25 @@ func (repo *RecordRepository) Delete(ctx context.Context, id string) error {
return args.Error(0)
}

type RecordIterator struct {
mock.Mock
}

func (m *RecordIterator) Scan() bool {
args := m.Called()
return args.Bool(0)
}

func (m *RecordIterator) Next() []models.Record {
args := m.Called()
return args.Get(0).([]models.Record)
}

func (m *RecordIterator) Close() error {
args := m.Called()
return args.Error(0)
}

type RecordSearcher struct {
mock.Mock
}
Expand Down
14 changes: 9 additions & 5 deletions lineage/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,17 @@ func (builder defaultBuilder) populateTypeRecords(ctx context.Context, graph Adj
if err != nil {
return fmt.Errorf("error obtaing record repository: %w", err)
}
records, err := recordRepository.GetAll(ctx, models.RecordFilter{})

recordIter, err := recordRepository.GetAllIterator(ctx)
if err != nil {
return fmt.Errorf("error reading records for type %q: %w", typ.Name, err)
return fmt.Errorf("error getting record iterator: %w", err)
}
for _, record := range records {
if err := builder.addRecord(graph, typ, record, lineageProc); err != nil {
return fmt.Errorf("error adding record to graph: %w", err)
defer recordIter.Close()
for recordIter.Scan() {
for _, record := range recordIter.Next() {
if err := builder.addRecord(graph, typ, record, lineageProc); err != nil {
return fmt.Errorf("error adding record to graph: %w", err)
}
}
}
return nil
Expand Down
7 changes: 6 additions & 1 deletion lineage/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,13 @@ func initialiseRepos(datasets []dataset) (models.TypeRepository, models.RecordRe
for _, dataset := range datasets {
typ := dataset.Type.Normalise()
tr.On("GetByName", typ.Name).Return(typ, nil)
recordIterator := new(mock.RecordIterator)
recordIterator.On("Scan").Return(true).Once()
recordIterator.On("Scan").Return(false).Once()
recordIterator.On("Next").Return(dataset.Records)
recordIterator.On("Close").Return(nil)
recordRepo := new(mock.RecordRepository)
recordRepo.On("GetAll", ctx, models.RecordFilter{}).Return(dataset.Records, nil)
recordRepo.On("GetAllIterator", ctx).Return(recordIterator, nil)
rrf.On("For", typ).Return(recordRepo, nil)
typList = append(typList, typ)
}
Expand Down
17 changes: 9 additions & 8 deletions lineage/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package lineage
import (
"context"
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -53,7 +54,6 @@ func (srv *Service) build() {
graph, err := srv.builder.Build(ctx, srv.typeRepo, srv.recordRepoFactory)
now := srv.timeSource.Now()
srv.metricsMonitor.Duration("lineageBuildTime", int(now.Sub(startTime)/time.Millisecond))

srv.mu.Lock()
defer srv.mu.Unlock()

Expand Down Expand Up @@ -104,11 +104,7 @@ func NewService(er models.TypeRepository, rrf models.RecordRepositoryFactory, co
return nil, err
}

// TODO: Find a solution to solve memory issue

// Temporarily disable building lineage on service creation.
// Columbus's memory keeps spiking when app is starting
// srv.build()
srv.build()

return srv, nil
}
Expand All @@ -124,11 +120,11 @@ func applyConfig(service *Service, config Config) error {
}
service.refreshInterval = lineageRefreshInterval

if config.MetricsMonitor != nil {
if !isNilMonitor(config.MetricsMonitor) {
service.metricsMonitor = config.MetricsMonitor
}

if config.PerformanceMonitor != nil {
if !isNilMonitor(config.PerformanceMonitor) {
service.performanceMonitor = config.PerformanceMonitor
}

Expand All @@ -142,3 +138,8 @@ func applyConfig(service *Service, config Config) error {

return nil
}

func isNilMonitor(monitor interface{}) bool {
v := reflect.ValueOf(monitor)
return !v.IsValid() || reflect.DeepEqual(v.Interface(), reflect.Zero(v.Type()).Interface())
}
9 changes: 9 additions & 0 deletions models/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import (
// TODO(Aman): add validation for mandatory fields? (landscape for instance)
type Record = map[string]interface{}

type RecordIterator interface {
Scan() bool
Next() []Record
Close() error
}

// RecordFilter is a filter intended to be used as a search
// criteria for operations involving record search
type RecordFilter = map[string][]string
Expand All @@ -22,6 +28,9 @@ type RecordRepository interface {
// used for return documents matching the search criteria.
GetAll(context.Context, RecordFilter) ([]Record, error)

// GetAllIterator returns RecordIterator to iterate records by batches
GetAllIterator(context.Context) (RecordIterator, error)

// GetByID returns a record by it's id.
// The field that contains this ID is defined by the
// type to which this record belongs
Expand Down
65 changes: 65 additions & 0 deletions store/record_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/odpf/columbus/models"
"github.com/olivere/elastic/v7"
)
Expand Down Expand Up @@ -92,6 +93,41 @@ func (repo *RecordRepository) writeInsertAction(w io.Writer, record models.Recor
return json.NewEncoder(w).Encode(action)
}

func (repo *RecordRepository) GetAllIterator(ctx context.Context) (models.RecordIterator, error) {
body, err := repo.getAllQuery(models.RecordFilter{})
if err != nil {
return nil, fmt.Errorf("error building search query: %w", err)
}

resp, err := repo.cli.Search(
repo.cli.Search.WithIndex(repo.recordType.Name),
repo.cli.Search.WithBody(body),
repo.cli.Search.WithScroll(defaultScrollTimeout),
repo.cli.Search.WithSize(defaultScrollBatchSize),
repo.cli.Search.WithContext(ctx),
)
if err != nil {
return nil, fmt.Errorf("error executing search: %w", err)
}
if resp.IsError() {
return nil, fmt.Errorf("error response from elasticsearch: %s", errorReasonFromResponse(resp))
}

var response searchResponse
err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return nil, fmt.Errorf("error decoding es response: %w", err)
}
var results = repo.toRecordList(response)
it := recordIterator{
resp: resp,
records: results,
scrollID: response.ScrollID,
repo: repo,
}
return &it, nil
}

func (repo *RecordRepository) GetAll(ctx context.Context, filters models.RecordFilter) ([]models.Record, error) {
// XXX(Aman): we should probably think about result ordering, if the client
// is going to slice the data for pagination. Does ES guarantee the result order?
Expand Down Expand Up @@ -246,6 +282,35 @@ func (repo *RecordRepository) Delete(ctx context.Context, id string) error {
return nil
}

// recordIterator is the internal implementation of models.RecordIterator by RecordRepository
type recordIterator struct {
resp *esapi.Response
records []models.Record
repo *RecordRepository
scrollID string
}

func (it *recordIterator) Scan() bool {
return len(strings.TrimSpace(it.scrollID)) > 0
}

func (it *recordIterator) Next() (prev []models.Record) {
prev = it.records
var err error
it.records, it.scrollID, err = it.repo.scrollRecords(context.Background(), it.scrollID)
if err != nil {
panic("error scrolling results:" + err.Error())
}
if len(it.records) == 0 {
it.scrollID = ""
}
return
}

func (it *recordIterator) Close() error {
return it.resp.Body.Close()
}

// RecordRepositoryFactory can be used to construct a RecordRepository
// for a certain type
type RecordRepositoryFactory struct {
Expand Down
38 changes: 38 additions & 0 deletions store/record_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,44 @@ func TestRecordRepository(t *testing.T) {
return
}

t.Run("GetAllIterator", func(t *testing.T) {
type testCase struct {
Description string
Filter models.RecordFilter
ResultsFile string
}

t.Run("should return record iterator to iterate records", func(t *testing.T) {
expectedResults := []models.Record{}
raw, err := ioutil.ReadFile("./testdata/dagger.json")
if err != nil {
t.Fatalf("error reading results file: %v", err)
return
}
err = json.Unmarshal(raw, &expectedResults)
if err != nil {
t.Fatalf("error parsing results file: %v", err)
return
}

var actualResults []models.Record
iterator, err := recordRepo.GetAllIterator(ctx)
if err != nil {
t.Fatalf("error executing GetAllIterator: %v", err)
return
}
for iterator.Scan() {
actualResults = append(actualResults, iterator.Next()...)
}
iterator.Close()

if reflect.DeepEqual(expectedResults, actualResults) == false {
t.Error(incorrectResultsError(expectedResults, actualResults))
return
}
})
})

t.Run("GetAll", func(t *testing.T) {
type testCase struct {
Description string
Expand Down

0 comments on commit 38ea109

Please sign in to comment.