Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(lineage): lineage building memory spikes #16

Merged
merged 91 commits into from
Oct 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
b23d858
Initial Commit
StewartJingga Mar 25, 2021
6f7f102
chore: add LICENSE
StewartJingga Mar 26, 2021
fa23d36
chore: update gitignore
StewartJingga Mar 26, 2021
96a3de0
feat: setup github workflow
StewartJingga Mar 29, 2021
64473f5
Merge remote-tracking branch 'github/main'
StewartJingga Mar 29, 2021
9f5ffc0
Merge branch 'prepare-for-github-migration'
StewartJingga Mar 29, 2021
31e2ce1
chore: update makefile
StewartJingga Mar 30, 2021
bb347dc
refactor: lineage package
StewartJingga Mar 30, 2021
037094e
refactor: lineage service error message
StewartJingga Mar 30, 2021
1e4cc60
feat: add config package
StewartJingga Mar 30, 2021
83c35b0
refactor: move route registration to web package
StewartJingga Mar 30, 2021
a7a23ba
refactor: move config initiation to cmd package
StewartJingga Mar 30, 2021
dd8f122
refactor: change web package to api
StewartJingga Mar 30, 2021
800cacf
refactor: cmd package
StewartJingga Mar 30, 2021
23b402d
refactor: rename es package to store
StewartJingga Mar 30, 2021
7122bdd
refactor: move handlers to its own package
StewartJingga Mar 30, 2021
a236c4c
Merge pull request #2 from odpf/align-odpf
StewartJingga Mar 30, 2021
d1afdc0
chore: untrack coverage result
StewartJingga Mar 31, 2021
ea3be55
feat: add GET /v1/types
StewartJingga Mar 31, 2021
1ae7bad
feat: add GET v1/types/{name}/details
StewartJingga Mar 31, 2021
b903835
docs: update swagger.yaml
StewartJingga Mar 31, 2021
403da21
feat: add DELETE /v1/types/{name}
StewartJingga Mar 31, 2021
2868a62
feat: use custom error for reserved type name
StewartJingga Mar 31, 2021
d23009e
fix: close reader after deleting type
StewartJingga Mar 31, 2021
7f85cf2
feat: add DELETE /v1/types/{name}/records/{id}
StewartJingga Mar 31, 2021
2a558dc
docs: update api docs
StewartJingga Mar 31, 2021
e2bd3c0
feat: log successful type deletion
StewartJingga Mar 31, 2021
74ad74d
refactor: return 204 on successful resource deletion
StewartJingga Apr 5, 2021
a075739
Merge pull request #3 from odpf/complete-crud-operations
StewartJingga Apr 5, 2021
ebf1db8
feat: switch search strategy to query time boosting
StewartJingga Apr 5, 2021
ffcf3a2
feat: use fields from type when adding boosts
StewartJingga Apr 5, 2021
031d0ec
feat: enable fuzzy search
StewartJingga Apr 5, 2021
3d2d371
Merge branch 'main' into improve-search-relevance
StewartJingga Apr 5, 2021
e6095c8
fix: fix error on test
StewartJingga Apr 6, 2021
19a190e
chore: rename github action
StewartJingga Apr 6, 2021
4ed0c05
Merge branch 'main' into improve-search-relevance
StewartJingga Apr 6, 2021
b60248b
docs: publish docs to gitbook
ravisuhag Apr 6, 2021
2bc14ab
GitBook: [main] 9 pages modified
ravisuhag Apr 6, 2021
8c03584
refactor: move test cases from fixture to code
StewartJingga Apr 6, 2021
cf78d29
feat: mak search's types cache configurable
StewartJingga Apr 6, 2021
efd084f
Merge remote-tracking branch 'origin/main' into improve-search-relevance
StewartJingga Apr 6, 2021
52c7123
Merge pull request #4 from odpf/improve-search-relevance
StewartJingga Apr 6, 2021
1278e18
GitBook: [main] 5 pages modified
ravisuhag Apr 6, 2021
2065f8c
fix: config not being populated using env vars
StewartJingga Apr 6, 2021
b45c4b3
Merge branch 'main' of github.com:odpf/columbus into main
StewartJingga Apr 6, 2021
797e458
GitBook: [main] 14 pages modified
ravisuhag Apr 6, 2021
bb5f008
chore: update readme with overview
ravisuhag Apr 6, 2021
8746d17
chore: update overview diagram
ravisuhag Apr 6, 2021
ed82444
feat: remove /v1/entities routes
StewartJingga Apr 7, 2021
d72ee2c
feat: modify typehandler to not act as a router
StewartJingga Apr 7, 2021
6b8fe6a
feat: modify searchHandler to not act as a router
StewartJingga Apr 7, 2021
76c88d9
feat: modify lineageHandler to not act as a router
StewartJingga Apr 7, 2021
d961b46
refactor: restructure metrics package
StewartJingga Apr 8, 2021
207fd41
fix: broken test
StewartJingga Apr 8, 2021
4036f0c
feat: allow passing context to type repository methods
StewartJingga Apr 9, 2021
3e0f998
feat: allow passing context to record repository methods
StewartJingga Apr 10, 2021
07a7496
feat: allow passing context to searcher methods
StewartJingga Apr 10, 2021
df650d5
feat: hook newrelic to elasticsearch process
StewartJingga Apr 10, 2021
aa8a3eb
feat: monitor es performance during lineage building
StewartJingga Apr 10, 2021
256e1e5
Merge pull request #5 from odpf/remove-entity-apis
StewartJingga Apr 12, 2021
db88c88
Merge pull request #6 from odpf/restructure-routes-registration
StewartJingga Apr 12, 2021
e9356c6
Merge branch 'main' into refactor-metrics-package
StewartJingga Apr 12, 2021
d35ba05
feat: add another API for ingesting records
StewartJingga Apr 12, 2021
2d4c502
feat: deprecate apis to be restful
StewartJingga Apr 12, 2021
5a729ca
Merge branch 'main' into refactor-metrics-package
StewartJingga Apr 13, 2021
1d07942
Merge pull request #7 from odpf/refactor-metrics-package
StewartJingga Apr 13, 2021
42c8073
Merge remote-tracking branch 'origin/main' into deprecate-and-add-new…
StewartJingga Apr 13, 2021
736813e
Merge pull request #8 from odpf/deprecate-and-add-new-apis
StewartJingga Apr 13, 2021
1aa9cb2
fix: search API does not respect filter properly
StewartJingga Apr 15, 2021
e0de59e
fix: fetching empty types returns null
StewartJingga Apr 15, 2021
1757551
fix: empty search whitelist to not causing error
StewartJingga Apr 16, 2021
5253c5b
Merge pull request #10 from odpf/fix-search-filter-not-working-as-exp…
pyadav Apr 19, 2021
27c3ab8
Merge pull request #12 from odpf/revamp-search-whitelist
StewartJingga Apr 19, 2021
33d30d5
Merge pull request #11 from odpf/return-empty-list-on-empty-types
StewartJingga Apr 19, 2021
06a912e
GitBook: [main] 4 pages modified
ravisuhag May 4, 2021
1093c7e
docs: update readme
ravisuhag May 4, 2021
54cd0f9
docs: add changelog
ravisuhag May 4, 2021
0d2454c
GitBook: [main] 3 pages and one asset modified
ravisuhag May 4, 2021
f3adac0
fix: disable lineage routes
StewartJingga May 7, 2021
48846a5
doc: add architecture doc
StewartJingga May 11, 2021
7c60ae6
doc: add roadmap
StewartJingga May 19, 2021
b261d3c
doc: add api reference
StewartJingga May 19, 2021
89a7783
merge: pull request #13 from odpf/update-docs
ravisuhag May 21, 2021
7097f48
chore: publish image to Dockerhub
StewartJingga May 24, 2021
a5cfdf9
chore: change image name to Dockerhub
StewartJingga May 24, 2021
6ff5255
spike: optimise memory use
Sep 30, 2021
accbdd4
Merge branch 'main' into spike-memory-optimisation
StewartJingga Oct 1, 2021
2b6f423
fix(lineage): monitors keep getting replaced by nil values
StewartJingga Oct 1, 2021
93c3188
fix(lineage): a broken test due to new way to fetch records
StewartJingga Oct 1, 2021
4e7455d
refactor: add test for RecordRepository.GetAllIterator
StewartJingga Oct 1, 2021
722b231
refactor: minor refactoring
StewartJingga Oct 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ COVERFILE="/tmp/columbus.coverprofile"
all: build

build:
go build -ldflags "-X main.Version=${VERSION}" ${NAME}
go build -ldflags "-X cmd.Version=${VERSION}" ${NAME}

clean:
rm -rf columbus dist/
Expand Down
24 changes: 12 additions & 12 deletions api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,25 @@ func RegisterRoutes(router *mux.Router, config Config) {
config.TypeRepository,
)

lineageHandler := handlers.NewLineageHandler(
config.Logger.WithField("reporter", "lineage-handler"),
config.LineageProvider,
)

router.PathPrefix("/ping").Handler(handlers.NewHeartbeatHandler())
setupTypeRoutes(router, "/v1/types", typeHandler)

router.Path("/v1/search").
Methods(http.MethodGet).
HandlerFunc(searchHandler.Search)

// Temporarily disable lineage routes
// lineageHandler := handlers.NewLineageHandler(
// config.Logger.WithField("reporter", "lineage-handler"),
// config.LineageProvider,
// )
// router.PathPrefix("/v1/lineage/{type}/{id}").
// Methods(http.MethodGet).
// HandlerFunc(lineageHandler.GetLineage)

// router.PathPrefix("/v1/lineage").
// Methods(http.MethodGet).
// HandlerFunc(lineageHandler.ListLineage)
router.PathPrefix("/v1/lineage/{type}/{id}").
Methods(http.MethodGet).
HandlerFunc(lineageHandler.GetLineage)

router.PathPrefix("/v1/lineage").
Methods(http.MethodGet).
HandlerFunc(lineageHandler.ListLineage)
}

func setupTypeRoutes(router *mux.Router, baseURL string, typeHandler *handlers.TypeHandler) {
Expand Down
2 changes: 2 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func initRouter(
if err != nil {
log.Fatalf("error creating searcher: %v", err)
}

lineageService, err := lineage.NewService(typeRepository, recordRepositoryFactory, lineage.Config{
RefreshInterval: config.LineageRefreshIntervalStr,
MetricsMonitor: statsdMonitor,
Expand All @@ -72,6 +73,7 @@ func initRouter(
if err != nil {
log.Fatal(err)
}
rootLogger.Info("lineage build complete")

router := mux.NewRouter()
if nrMonitor != nil {
Expand Down
24 changes: 24 additions & 0 deletions lib/mock/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ func (repo *RecordRepository) GetAll(ctx context.Context, filter models.RecordFi
return args.Get(0).([]models.Record), args.Error(1)
}

func (repo *RecordRepository) GetAllIterator(ctx context.Context) (models.RecordIterator, error) {
args := repo.Called(ctx)
return args.Get(0).(models.RecordIterator), args.Error(1)
}

func (repo *RecordRepository) GetByID(ctx context.Context, id string) (models.Record, error) {
args := repo.Called(ctx, id)
return args.Get(0).(models.Record), args.Error(1)
Expand All @@ -67,6 +72,25 @@ func (repo *RecordRepository) Delete(ctx context.Context, id string) error {
return args.Error(0)
}

type RecordIterator struct {
mock.Mock
}

func (m *RecordIterator) Scan() bool {
args := m.Called()
return args.Bool(0)
}

func (m *RecordIterator) Next() []models.Record {
args := m.Called()
return args.Get(0).([]models.Record)
}

func (m *RecordIterator) Close() error {
args := m.Called()
return args.Error(0)
}

type RecordSearcher struct {
mock.Mock
}
Expand Down
14 changes: 9 additions & 5 deletions lineage/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,17 @@ func (builder defaultBuilder) populateTypeRecords(ctx context.Context, graph Adj
if err != nil {
return fmt.Errorf("error obtaing record repository: %w", err)
}
records, err := recordRepository.GetAll(ctx, models.RecordFilter{})

recordIter, err := recordRepository.GetAllIterator(ctx)
if err != nil {
return fmt.Errorf("error reading records for type %q: %w", typ.Name, err)
return fmt.Errorf("error getting record iterator: %w", err)
}
for _, record := range records {
if err := builder.addRecord(graph, typ, record, lineageProc); err != nil {
return fmt.Errorf("error adding record to graph: %w", err)
defer recordIter.Close()
for recordIter.Scan() {
for _, record := range recordIter.Next() {
if err := builder.addRecord(graph, typ, record, lineageProc); err != nil {
return fmt.Errorf("error adding record to graph: %w", err)
}
}
}
return nil
Expand Down
7 changes: 6 additions & 1 deletion lineage/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,13 @@ func initialiseRepos(datasets []dataset) (models.TypeRepository, models.RecordRe
for _, dataset := range datasets {
typ := dataset.Type.Normalise()
tr.On("GetByName", typ.Name).Return(typ, nil)
recordIterator := new(mock.RecordIterator)
recordIterator.On("Scan").Return(true).Once()
recordIterator.On("Scan").Return(false).Once()
recordIterator.On("Next").Return(dataset.Records)
recordIterator.On("Close").Return(nil)
recordRepo := new(mock.RecordRepository)
recordRepo.On("GetAll", ctx, models.RecordFilter{}).Return(dataset.Records, nil)
recordRepo.On("GetAllIterator", ctx).Return(recordIterator, nil)
rrf.On("For", typ).Return(recordRepo, nil)
typList = append(typList, typ)
}
Expand Down
17 changes: 9 additions & 8 deletions lineage/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package lineage
import (
"context"
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -53,7 +54,6 @@ func (srv *Service) build() {
graph, err := srv.builder.Build(ctx, srv.typeRepo, srv.recordRepoFactory)
now := srv.timeSource.Now()
srv.metricsMonitor.Duration("lineageBuildTime", int(now.Sub(startTime)/time.Millisecond))

srv.mu.Lock()
defer srv.mu.Unlock()

Expand Down Expand Up @@ -104,11 +104,7 @@ func NewService(er models.TypeRepository, rrf models.RecordRepositoryFactory, co
return nil, err
}

// TODO: Find a solution to solve memory issue

// Temporarily disable building lineage on service creation.
// Columbus's memory keeps spiking when app is starting
// srv.build()
srv.build()

return srv, nil
}
Expand All @@ -124,11 +120,11 @@ func applyConfig(service *Service, config Config) error {
}
service.refreshInterval = lineageRefreshInterval

if config.MetricsMonitor != nil {
if !isNilMonitor(config.MetricsMonitor) {
service.metricsMonitor = config.MetricsMonitor
}

if config.PerformanceMonitor != nil {
if !isNilMonitor(config.PerformanceMonitor) {
service.performanceMonitor = config.PerformanceMonitor
}

Expand All @@ -142,3 +138,8 @@ func applyConfig(service *Service, config Config) error {

return nil
}

func isNilMonitor(monitor interface{}) bool {
v := reflect.ValueOf(monitor)
return !v.IsValid() || reflect.DeepEqual(v.Interface(), reflect.Zero(v.Type()).Interface())
}
9 changes: 9 additions & 0 deletions models/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import (
// TODO(Aman): add validation for mandatory fields? (landscape for instance)
type Record = map[string]interface{}

type RecordIterator interface {
Scan() bool
Next() []Record
Close() error
}

// RecordFilter is a filter intended to be used as a search
// criteria for operations involving record search
type RecordFilter = map[string][]string
Expand All @@ -22,6 +28,9 @@ type RecordRepository interface {
// used for return documents matching the search criteria.
GetAll(context.Context, RecordFilter) ([]Record, error)

// GetAllIterator returns RecordIterator to iterate records by batches
GetAllIterator(context.Context) (RecordIterator, error)

// GetByID returns a record by it's id.
// The field that contains this ID is defined by the
// type to which this record belongs
Expand Down
65 changes: 65 additions & 0 deletions store/record_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/odpf/columbus/models"
"github.com/olivere/elastic/v7"
)
Expand Down Expand Up @@ -92,6 +93,41 @@ func (repo *RecordRepository) writeInsertAction(w io.Writer, record models.Recor
return json.NewEncoder(w).Encode(action)
}

func (repo *RecordRepository) GetAllIterator(ctx context.Context) (models.RecordIterator, error) {
body, err := repo.getAllQuery(models.RecordFilter{})
if err != nil {
return nil, fmt.Errorf("error building search query: %w", err)
}

resp, err := repo.cli.Search(
repo.cli.Search.WithIndex(repo.recordType.Name),
repo.cli.Search.WithBody(body),
repo.cli.Search.WithScroll(defaultScrollTimeout),
repo.cli.Search.WithSize(defaultScrollBatchSize),
repo.cli.Search.WithContext(ctx),
)
if err != nil {
return nil, fmt.Errorf("error executing search: %w", err)
}
if resp.IsError() {
return nil, fmt.Errorf("error response from elasticsearch: %s", errorReasonFromResponse(resp))
}

var response searchResponse
err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return nil, fmt.Errorf("error decoding es response: %w", err)
}
var results = repo.toRecordList(response)
it := recordIterator{
resp: resp,
records: results,
scrollID: response.ScrollID,
repo: repo,
}
return &it, nil
}

func (repo *RecordRepository) GetAll(ctx context.Context, filters models.RecordFilter) ([]models.Record, error) {
// XXX(Aman): we should probably think about result ordering, if the client
// is going to slice the data for pagination. Does ES guarantee the result order?
Expand Down Expand Up @@ -246,6 +282,35 @@ func (repo *RecordRepository) Delete(ctx context.Context, id string) error {
return nil
}

// recordIterator is the internal implementation of models.RecordIterator by RecordRepository
type recordIterator struct {
resp *esapi.Response
records []models.Record
repo *RecordRepository
scrollID string
}

func (it *recordIterator) Scan() bool {
return len(strings.TrimSpace(it.scrollID)) > 0
}

func (it *recordIterator) Next() (prev []models.Record) {
prev = it.records
var err error
it.records, it.scrollID, err = it.repo.scrollRecords(context.Background(), it.scrollID)
if err != nil {
panic("error scrolling results:" + err.Error())
}
if len(it.records) == 0 {
it.scrollID = ""
}
return
}

func (it *recordIterator) Close() error {
return it.resp.Body.Close()
}

// RecordRepositoryFactory can be used to construct a RecordRepository
// for a certain type
type RecordRepositoryFactory struct {
Expand Down
38 changes: 38 additions & 0 deletions store/record_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,44 @@ func TestRecordRepository(t *testing.T) {
return
}

t.Run("GetAllIterator", func(t *testing.T) {
type testCase struct {
Description string
Filter models.RecordFilter
ResultsFile string
}

t.Run("should return record iterator to iterate records", func(t *testing.T) {
expectedResults := []models.Record{}
raw, err := ioutil.ReadFile("./testdata/dagger.json")
if err != nil {
t.Fatalf("error reading results file: %v", err)
return
}
err = json.Unmarshal(raw, &expectedResults)
if err != nil {
t.Fatalf("error parsing results file: %v", err)
return
}

var actualResults []models.Record
iterator, err := recordRepo.GetAllIterator(ctx)
if err != nil {
t.Fatalf("error executing GetAllIterator: %v", err)
return
}
for iterator.Scan() {
actualResults = append(actualResults, iterator.Next()...)
}
iterator.Close()

if reflect.DeepEqual(expectedResults, actualResults) == false {
t.Error(incorrectResultsError(expectedResults, actualResults))
return
}
})
})

t.Run("GetAll", func(t *testing.T) {
type testCase struct {
Description string
Expand Down