Skip to content

Commit

Permalink
feat(lineage): delete lineage for the asset when deleting asset. (#220)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjali9791 authored Mar 24, 2023
1 parent fd5e5ee commit acb3c90
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 4 deletions.
11 changes: 11 additions & 0 deletions core/asset/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ type NotFoundError struct {
URN string
}

type LineageNotFoundError struct {
URN string
}

func (err LineageNotFoundError) Error() string {
if err.URN != "" {
return fmt.Sprintf("no lineage found for record: %q", err.URN)
}
return "could not find lineage"
}

func (err NotFoundError) Error() string {
if err.AssetID != "" {
return fmt.Sprintf("no such record: %q", err.AssetID)
Expand Down
1 change: 1 addition & 0 deletions core/asset/lineage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type LineageQuery struct {
type LineageRepository interface {
GetGraph(ctx context.Context, urn string, query LineageQuery) (LineageGraph, error)
Upsert(ctx context.Context, urn string, upstreams, downstreams []string) error
DeleteByURN(ctx context.Context, urn string) error
}

type LineageGraph []LineageEdge
Expand Down
40 changes: 39 additions & 1 deletion core/asset/mocks/lineage_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 13 additions & 3 deletions core/asset/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,28 @@ func (s *Service) UpsertAssetWithoutLineage(ctx context.Context, ast *Asset) (st

func (s *Service) DeleteAsset(ctx context.Context, id string) error {
if isValidUUID(id) {
asset, err := s.assetRepository.GetByID(ctx, id)
if err != nil {
return err
}
if err := s.assetRepository.DeleteByID(ctx, id); err != nil {
return err
}

return s.discoveryRepository.DeleteByID(ctx, id)
if err := s.discoveryRepository.DeleteByID(ctx, id); err != nil {
return err
}
return s.lineageRepository.DeleteByURN(ctx, asset.URN)
}

if err := s.assetRepository.DeleteByURN(ctx, id); err != nil {
return err
}

return s.discoveryRepository.DeleteByURN(ctx, id)
if err := s.discoveryRepository.DeleteByURN(ctx, id); err != nil {
return err
}

return s.lineageRepository.DeleteByURN(ctx, id)
}

func (s *Service) GetAssetByID(ctx context.Context, id string) (ast Asset, err error) {
Expand Down
35 changes: 35 additions & 0 deletions core/asset/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,19 @@ func TestService_DeleteAsset(t *testing.T) {
}

var testCases = []testCase{
{
Description: `with ID, should return error if asset repository getAsset return error`,
ID: assetID,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository, lr *mocks.LineageRepository) {
ar.EXPECT().GetByID(ctx, assetID).Return(asset.Asset{}, errors.New("unknown error"))
},
Err: errors.New("unknown error"),
},
{
Description: `with ID, should return error if asset repository delete return error`,
ID: assetID,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository, lr *mocks.LineageRepository) {
ar.EXPECT().GetByID(ctx, assetID).Return(asset.Asset{URN: urn}, nil)
ar.EXPECT().DeleteByID(ctx, assetID).Return(errors.New("unknown error"))
},
Err: errors.New("unknown error"),
Expand All @@ -324,11 +333,23 @@ func TestService_DeleteAsset(t *testing.T) {
Description: `with ID, should return error if discovery repository delete return error`,
ID: assetID,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository, lr *mocks.LineageRepository) {
ar.EXPECT().GetByID(ctx, assetID).Return(asset.Asset{URN: urn}, nil)
ar.EXPECT().DeleteByID(ctx, assetID).Return(nil)
dr.EXPECT().DeleteByID(ctx, assetID).Return(errors.New("unknown error"))
},
Err: errors.New("unknown error"),
},
{
Description: `with ID, should return error if lineage repository delete return error`,
ID: assetID,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository, lr *mocks.LineageRepository) {
ar.EXPECT().GetByID(ctx, assetID).Return(asset.Asset{URN: urn}, nil)
ar.EXPECT().DeleteByID(ctx, assetID).Return(nil)
dr.EXPECT().DeleteByID(ctx, assetID).Return(nil)
lr.EXPECT().DeleteByURN(ctx, urn).Return(errors.New("unknown error"))
},
Err: errors.New("unknown error"),
},
{
Description: `with URN, should return error if asset repository delete return error`,
ID: urn,
Expand All @@ -341,17 +362,30 @@ func TestService_DeleteAsset(t *testing.T) {
Description: `with URN, should return error if discovery repository delete return error`,
ID: urn,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository, lr *mocks.LineageRepository) {

ar.EXPECT().DeleteByURN(ctx, urn).Return(nil)
dr.EXPECT().DeleteByURN(ctx, urn).Return(errors.New("unknown error"))
},
Err: errors.New("unknown error"),
},
{
Description: `with URN, should return error if lineage repository delete return error`,
ID: urn,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository, lr *mocks.LineageRepository) {
ar.EXPECT().DeleteByURN(ctx, urn).Return(nil)
dr.EXPECT().DeleteByURN(ctx, urn).Return(nil)
lr.EXPECT().DeleteByURN(ctx, urn).Return(errors.New("unknown error"))
},
Err: errors.New("unknown error"),
},
{
Description: `should call DeleteByID on repositories when given a UUID`,
ID: assetID,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository, lr *mocks.LineageRepository) {
ar.EXPECT().GetByID(ctx, assetID).Return(asset.Asset{URN: urn}, nil)
ar.EXPECT().DeleteByID(ctx, assetID).Return(nil)
dr.EXPECT().DeleteByID(ctx, assetID).Return(nil)
lr.EXPECT().DeleteByURN(ctx, urn).Return(nil)
},
Err: nil,
},
Expand All @@ -361,6 +395,7 @@ func TestService_DeleteAsset(t *testing.T) {
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository, lr *mocks.LineageRepository) {
ar.EXPECT().DeleteByURN(ctx, urn).Return(nil)
dr.EXPECT().DeleteByURN(ctx, urn).Return(nil)
lr.EXPECT().DeleteByURN(ctx, urn).Return(nil)
},
Err: nil,
},
Expand Down
24 changes: 24 additions & 0 deletions internal/store/postgres/lineage_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,30 @@ func (repo *LineageRepository) GetGraph(ctx context.Context, urn string, query a
return graph, nil
}

func (repo *LineageRepository) DeleteByURN(ctx context.Context, urn string) error {

deleteQuery, _, err := sq.Delete("lineage_graph").Where(
fmt.Sprintf("source='%s' or target='%s'", urn, urn)).ToSql()

if err != nil {
return fmt.Errorf("error building delete query when deleting asset with URN = %q: %w", urn, err)
}

res, err := repo.client.db.Exec(deleteQuery)
if err != nil {
return fmt.Errorf("error deleting asset with URN = %q: %w", urn, err)
}
affectedRows, err := res.RowsAffected()

if err != nil {
return fmt.Errorf("error getting affected rows: %w", err)
}
if affectedRows == 0 {
return asset.LineageNotFoundError{URN: urn}
}
return nil
}

// Upsert insert or delete connections of a given node by comparing them with current state
func (repo *LineageRepository) Upsert(ctx context.Context, urn string, upstreams, downstreams []string) error {
currentGraph, err := repo.getDirectLineage(ctx, urn)
Expand Down
23 changes: 23 additions & 0 deletions internal/store/postgres/lineage_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,29 @@ func (r *LineageRepositoryTestSuite) TestUpsert() {
})
}

func (r *LineageRepositoryTestSuite) TestDeleteByURN() {
r.Run("should delete asset from lineage", func() {
nodeURN := "table-1"

// create initial
err := r.repository.Upsert(r.ctx, nodeURN, []string{"table-2"}, []string{"table-3"})
r.NoError(err)

err = r.repository.DeleteByURN(r.ctx, nodeURN)
r.NoError(err)

graph, err := r.repository.GetGraph(r.ctx, nodeURN, asset.LineageQuery{})
r.Require().NoError(err)
r.compareGraphs(asset.LineageGraph{}, graph)
})

r.Run("delete when URN has no lineage", func() {
nodeURN := "table-1"
err := r.repository.DeleteByURN(r.ctx, nodeURN)
r.Equal(asset.LineageNotFoundError{URN: nodeURN}.Error(), err.Error())
})
}

func (r *LineageRepositoryTestSuite) compareGraphs(expected, actual asset.LineageGraph) {
expLen := len(expected)
r.Require().Len(actual, expLen)
Expand Down

0 comments on commit acb3c90

Please sign in to comment.