Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lineage): delete lineage for the asset when deleting asset. #220

Merged
merged 1 commit into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/asset/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ type NotFoundError struct {
URN string
}

type LineageNotFoundError struct {
URN string
}

func (err LineageNotFoundError) Error() string {
if err.URN != "" {
return fmt.Sprintf("no lineage found for record: %q", err.URN)
}
return "could not find lineage"
}

func (err NotFoundError) Error() string {
if err.AssetID != "" {
return fmt.Sprintf("no such record: %q", err.AssetID)
Expand Down
1 change: 1 addition & 0 deletions core/asset/lineage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type LineageQuery struct {
type LineageRepository interface {
GetGraph(ctx context.Context, urn string, query LineageQuery) (LineageGraph, error)
Upsert(ctx context.Context, urn string, upstreams, downstreams []string) error
DeleteByURN(ctx context.Context, urn string) error
}

type LineageGraph []LineageEdge
Expand Down
40 changes: 39 additions & 1 deletion core/asset/mocks/lineage_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 13 additions & 3 deletions core/asset/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,28 @@ func (s *Service) UpsertAssetWithoutLineage(ctx context.Context, ast *Asset) (st

func (s *Service) DeleteAsset(ctx context.Context, id string) error {
if isValidUUID(id) {
asset, err := s.assetRepository.GetByID(ctx, id)
if err != nil {
return err
}
if err := s.assetRepository.DeleteByID(ctx, id); err != nil {
return err
}

return s.discoveryRepository.DeleteByID(ctx, id)
if err := s.discoveryRepository.DeleteByID(ctx, id); err != nil {
return err
}
return s.lineageRepository.DeleteByURN(ctx, asset.URN)
}

if err := s.assetRepository.DeleteByURN(ctx, id); err != nil {
return err
}

return s.discoveryRepository.DeleteByURN(ctx, id)
if err := s.discoveryRepository.DeleteByURN(ctx, id); err != nil {
return err
}

return s.lineageRepository.DeleteByURN(ctx, id)
}

func (s *Service) GetAssetByID(ctx context.Context, id string) (ast Asset, err error) {
Expand Down
35 changes: 35 additions & 0 deletions core/asset/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,19 @@ func TestService_DeleteAsset(t *testing.T) {
}

var testCases = []testCase{
{
Description: `with ID, should return error if asset repository getAsset return error`,
ID: assetID,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository, lr *mocks.LineageRepository) {
ar.EXPECT().GetByID(ctx, assetID).Return(asset.Asset{}, errors.New("unknown error"))
},
Err: errors.New("unknown error"),
},
{
Description: `with ID, should return error if asset repository delete return error`,
ID: assetID,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository, lr *mocks.LineageRepository) {
ar.EXPECT().GetByID(ctx, assetID).Return(asset.Asset{URN: urn}, nil)
ar.EXPECT().DeleteByID(ctx, assetID).Return(errors.New("unknown error"))
},
Err: errors.New("unknown error"),
Expand All @@ -324,11 +333,23 @@ func TestService_DeleteAsset(t *testing.T) {
Description: `with ID, should return error if discovery repository delete return error`,
ID: assetID,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository, lr *mocks.LineageRepository) {
ar.EXPECT().GetByID(ctx, assetID).Return(asset.Asset{URN: urn}, nil)
ar.EXPECT().DeleteByID(ctx, assetID).Return(nil)
dr.EXPECT().DeleteByID(ctx, assetID).Return(errors.New("unknown error"))
},
Err: errors.New("unknown error"),
},
{
Description: `with ID, should return error if lineage repository delete return error`,
ID: assetID,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository, lr *mocks.LineageRepository) {
ar.EXPECT().GetByID(ctx, assetID).Return(asset.Asset{URN: urn}, nil)
ar.EXPECT().DeleteByID(ctx, assetID).Return(nil)
dr.EXPECT().DeleteByID(ctx, assetID).Return(nil)
lr.EXPECT().DeleteByURN(ctx, urn).Return(errors.New("unknown error"))
},
Err: errors.New("unknown error"),
},
{
Description: `with URN, should return error if asset repository delete return error`,
ID: urn,
Expand All @@ -341,17 +362,30 @@ func TestService_DeleteAsset(t *testing.T) {
Description: `with URN, should return error if discovery repository delete return error`,
ID: urn,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository, lr *mocks.LineageRepository) {

ar.EXPECT().DeleteByURN(ctx, urn).Return(nil)
dr.EXPECT().DeleteByURN(ctx, urn).Return(errors.New("unknown error"))
},
Err: errors.New("unknown error"),
},
{
Description: `with URN, should return error if lineage repository delete return error`,
ID: urn,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository, lr *mocks.LineageRepository) {
ar.EXPECT().DeleteByURN(ctx, urn).Return(nil)
dr.EXPECT().DeleteByURN(ctx, urn).Return(nil)
lr.EXPECT().DeleteByURN(ctx, urn).Return(errors.New("unknown error"))
},
Err: errors.New("unknown error"),
},
{
Description: `should call DeleteByID on repositories when given a UUID`,
ID: assetID,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository, lr *mocks.LineageRepository) {
ar.EXPECT().GetByID(ctx, assetID).Return(asset.Asset{URN: urn}, nil)
ar.EXPECT().DeleteByID(ctx, assetID).Return(nil)
dr.EXPECT().DeleteByID(ctx, assetID).Return(nil)
lr.EXPECT().DeleteByURN(ctx, urn).Return(nil)
},
Err: nil,
},
Expand All @@ -361,6 +395,7 @@ func TestService_DeleteAsset(t *testing.T) {
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository, lr *mocks.LineageRepository) {
ar.EXPECT().DeleteByURN(ctx, urn).Return(nil)
dr.EXPECT().DeleteByURN(ctx, urn).Return(nil)
lr.EXPECT().DeleteByURN(ctx, urn).Return(nil)
},
Err: nil,
},
Expand Down
24 changes: 24 additions & 0 deletions internal/store/postgres/lineage_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,30 @@ func (repo *LineageRepository) GetGraph(ctx context.Context, urn string, query a
return graph, nil
}

func (repo *LineageRepository) DeleteByURN(ctx context.Context, urn string) error {

deleteQuery, _, err := sq.Delete("lineage_graph").Where(
fmt.Sprintf("source='%s' or target='%s'", urn, urn)).ToSql()

if err != nil {
return fmt.Errorf("error building delete query when deleting asset with URN = %q: %w", urn, err)
}

res, err := repo.client.db.Exec(deleteQuery)
if err != nil {
return fmt.Errorf("error deleting asset with URN = %q: %w", urn, err)
}
affectedRows, err := res.RowsAffected()

if err != nil {
return fmt.Errorf("error getting affected rows: %w", err)
}
if affectedRows == 0 {
return asset.LineageNotFoundError{URN: urn}
}
return nil
}

// Upsert insert or delete connections of a given node by comparing them with current state
func (repo *LineageRepository) Upsert(ctx context.Context, urn string, upstreams, downstreams []string) error {
currentGraph, err := repo.getDirectLineage(ctx, urn)
Expand Down
23 changes: 23 additions & 0 deletions internal/store/postgres/lineage_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,29 @@ func (r *LineageRepositoryTestSuite) TestUpsert() {
})
}

func (r *LineageRepositoryTestSuite) TestDeleteByURN() {
r.Run("should delete asset from lineage", func() {
nodeURN := "table-1"

// create initial
err := r.repository.Upsert(r.ctx, nodeURN, []string{"table-2"}, []string{"table-3"})
r.NoError(err)

err = r.repository.DeleteByURN(r.ctx, nodeURN)
r.NoError(err)

graph, err := r.repository.GetGraph(r.ctx, nodeURN, asset.LineageQuery{})
r.Require().NoError(err)
r.compareGraphs(asset.LineageGraph{}, graph)
})

r.Run("delete when URN has no lineage", func() {
nodeURN := "table-1"
err := r.repository.DeleteByURN(r.ctx, nodeURN)
r.Equal(asset.LineageNotFoundError{URN: nodeURN}.Error(), err.Error())
})
}

func (r *LineageRepositoryTestSuite) compareGraphs(expected, actual asset.LineageGraph) {
expLen := len(expected)
r.Require().Len(actual, expLen)
Expand Down