Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Patch asset's lineage #195

Merged
merged 1 commit into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
NAME="github.com/odpf/compass"
VERSION=$(shell git describe --always --tags 2>/dev/null)
COVERFILE="/tmp/compass.coverprofile"
PROTON_COMMIT := "838f2a8c9ddc8fa6dfbd6f3ebe6201e76e2368f2"
PROTON_COMMIT := "c7639b42da0679b2340a52155d2fe577b9d45aa2"
.PHONY: all build test clean install proto

all: build
Expand Down
24 changes: 15 additions & 9 deletions core/asset/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,27 @@ func (s *Service) GetAllAssets(ctx context.Context, flt Filter, withTotal bool)
}

func (s *Service) UpsertAsset(ctx context.Context, ast *Asset, upstreams, downstreams []string) (string, error) {
var assetID string
var err error
assetID, err := s.UpsertAssetWithoutLineage(ctx, ast)
if err != nil {
return "", err
}

if err := s.lineageRepository.Upsert(ctx, ast.URN, upstreams, downstreams); err != nil {
return "", err
}

assetID, err = s.assetRepository.Upsert(ctx, ast)
return assetID, nil
}

func (s *Service) UpsertAssetWithoutLineage(ctx context.Context, ast *Asset) (string, error) {
assetID, err := s.assetRepository.Upsert(ctx, ast)
if err != nil {
return assetID, err
return "", err
}

ast.ID = assetID
if err := s.discoveryRepository.Upsert(ctx, *ast); err != nil {
return assetID, err
}

if err := s.lineageRepository.Upsert(ctx, ast.URN, upstreams, downstreams); err != nil {
return assetID, err
return "", err
}

return assetID, nil
Expand Down
66 changes: 62 additions & 4 deletions core/asset/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,70 @@ func TestService_UpsertAsset(t *testing.T) {

svc := asset.NewService(mockAssetRepo, mockDiscoveryRepo, mockLineageRepo)
rid, err := svc.UpsertAsset(ctx, tc.Asset, tc.Upstreams, tc.Downstreams)
if err != nil && errors.Is(tc.Err, err) {
t.Fatalf("got error %v, expected error was %v", err, tc.Err)
if tc.Err != nil {
assert.EqualError(t, err, tc.Err.Error())
return
}
assert.NoError(t, err)
assert.Equal(t, tc.ReturnedID, rid)
})
}
}

func TestService_UpsertAssetWithoutLineage(t *testing.T) {
sampleAsset := &asset.Asset{ID: "some-id", URN: "some-urn", Type: asset.TypeDashboard, Service: "some-service"}
var testCases = []struct {
Description string
Asset *asset.Asset
Err error
ReturnedID string
Setup func(context.Context, *mocks.AssetRepository, *mocks.DiscoveryRepository)
}{
{
Description: `should return error if asset repository upsert return error`,
Asset: sampleAsset,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository) {
ar.EXPECT().Upsert(ctx, sampleAsset).Return("", errors.New("unknown error"))
},
Err: errors.New("unknown error"),
},
{
Description: `should return error if discovery repository upsert return error`,
Asset: sampleAsset,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository) {
ar.EXPECT().Upsert(ctx, sampleAsset).Return(sampleAsset.ID, nil)
dr.EXPECT().Upsert(ctx, *sampleAsset).Return(errors.New("unknown error"))
},
Err: errors.New("unknown error"),
},
{
Description: `should return no error if all repositories upsert return no error`,
Asset: sampleAsset,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository) {
ar.EXPECT().Upsert(ctx, sampleAsset).Return(sampleAsset.ID, nil)
dr.EXPECT().Upsert(ctx, *sampleAsset).Return(nil)
},
ReturnedID: sampleAsset.ID,
},
}
for _, tc := range testCases {
t.Run(tc.Description, func(t *testing.T) {
ctx := context.Background()

mockAssetRepo := mocks.NewAssetRepository(t)
mockDiscoveryRepo := mocks.NewDiscoveryRepository(t)
if tc.Setup != nil {
tc.Setup(ctx, mockAssetRepo, mockDiscoveryRepo)
}
if tc.ReturnedID != rid {
t.Fatalf("got returned id %v, expected returned id was %v", rid, tc.ReturnedID)

svc := asset.NewService(mockAssetRepo, mockDiscoveryRepo, mocks.NewLineageRepository(t))
rid, err := svc.UpsertAssetWithoutLineage(ctx, tc.Asset)
if tc.Err != nil {
assert.EqualError(t, err, tc.Err.Error())
return
}
assert.NoError(t, err)
assert.Equal(t, tc.ReturnedID, rid)
})
}
}
Expand Down
46 changes: 39 additions & 7 deletions internal/server/v1beta1/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type AssetService interface {
GetAssetByVersion(ctx context.Context, id string, version string) (asset.Asset, error)
GetAssetVersionHistory(ctx context.Context, flt asset.Filter, id string) ([]asset.Asset, error)
UpsertAsset(ctx context.Context, ast *asset.Asset, upstreams, downstreams []string) (string, error)
UpsertAssetWithoutLineage(ctx context.Context, ast *asset.Asset) (string, error)
DeleteAsset(ctx context.Context, id string) error

GetLineage(ctx context.Context, urn string, query asset.LineageQuery) (asset.Lineage, error)
Expand Down Expand Up @@ -268,13 +269,14 @@ func (server *APIServer) UpsertPatchAsset(ctx context.Context, req *compassv1bet
ast.Patch(patchAssetMap)
ast.UpdatedBy.ID = userID

assetID, err := server.upsertAsset(
ctx,
ast,
"asset_upsert_patch",
req.GetUpstreams(),
req.GetDownstreams(),
)
var assetID string
if len(req.Upstreams) != 0 || len(req.Downstreams) != 0 || req.OverwriteLineage {
assetID, err = server.upsertAsset(
ctx, ast, "asset_upsert_patch", req.GetUpstreams(), req.GetDownstreams(),
)
} else {
assetID, err = server.upsertAssetWithoutLineage(ctx, ast)
}
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -382,6 +384,36 @@ func (server *APIServer) upsertAsset(
return
}

func (server *APIServer) upsertAssetWithoutLineage(ctx context.Context, ast asset.Asset) (string, error) {
const mode = "asset_upsert_patch_without_lineage"

if err := server.validateAsset(ast); err != nil {
return "", status.Error(codes.InvalidArgument, err.Error())
}

assetID, err := server.assetService.UpsertAssetWithoutLineage(ctx, &ast)
if err != nil {
switch {
case errors.As(err, new(asset.InvalidError)):
return "", status.Error(codes.InvalidArgument, err.Error())

case errors.As(err, new(asset.DiscoveryError)):
server.sendStatsDCounterMetric("discovery_error",
map[string]string{
"method": mode,
})
}

return "", internalServerError(server.logger, err.Error())
}

server.sendStatsDCounterMetric(mode, map[string]string{
"type": ast.Type.String(),
"service": ast.Service,
})
return assetID, nil
}

func (server *APIServer) buildAsset(baseAsset *compassv1beta1.UpsertAssetRequest_Asset) asset.Asset {
ast := asset.Asset{
URN: baseAsset.GetUrn(),
Expand Down
91 changes: 91 additions & 0 deletions internal/server/v1beta1/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,97 @@ func TestUpsertPatchAsset(t *testing.T) {

},
},
{
Description: "without explicit overwrite_lineage, should upsert asset without lineage",
Setup: func(ctx context.Context, as *mocks.AssetService) {
patchedAsset := asset.Asset{
URN: "test dagger",
Type: asset.TypeTable,
Name: "new-name",
Service: "kafka",
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
Owners: []user.User{{ID: "id", UUID: "", Email: "email@email.com", Provider: "provider"}},
}

assetWithID := patchedAsset
assetWithID.ID = assetID

as.EXPECT().GetAssetByID(ctx, "test dagger").Return(currentAsset, nil)
as.EXPECT().UpsertAssetWithoutLineage(ctx, &patchedAsset).
Return(assetWithID.ID, nil).
Run(func(ctx context.Context, ast *asset.Asset) {
patchedAsset.ID = assetWithID.ID
})
},
Request: &compassv1beta1.UpsertPatchAssetRequest{
Asset: &compassv1beta1.UpsertPatchAssetRequest_Asset{
Urn: "test dagger",
Type: "table",
Name: wrapperspb.String("new-name"),
Service: "kafka",
Data: &structpb.Struct{},
Owners: []*compassv1beta1.User{{Id: "id", Uuid: "", Email: "email@email.com", Provider: "provider"}},
},
},
ExpectStatus: codes.OK,
PostCheck: func(resp *compassv1beta1.UpsertPatchAssetResponse) error {
expected := &compassv1beta1.UpsertPatchAssetResponse{
Id: assetID,
}
if diff := cmp.Diff(resp, expected, protocmp.Transform()); diff != "" {
return fmt.Errorf("expected response to be %+v, was %+v", expected, resp)
}
return nil

},
},
{
Description: "with explicit overwrite_lineage, should upsert asset when lineage is not in the request",
Setup: func(ctx context.Context, as *mocks.AssetService) {
patchedAsset := asset.Asset{
URN: "test dagger",
Type: asset.TypeTable,
Name: "new-name",
Service: "kafka",
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
Owners: []user.User{{ID: "id", UUID: "", Email: "email@email.com", Provider: "provider"}},
}

assetWithID := patchedAsset
assetWithID.ID = assetID

as.EXPECT().GetAssetByID(ctx, "test dagger").Return(currentAsset, nil)
as.EXPECT().UpsertAsset(ctx, &patchedAsset, []string{}, []string{}).
Return(assetWithID.ID, nil).
Run(func(ctx context.Context, ast *asset.Asset, _, _ []string) {
patchedAsset.ID = assetWithID.ID
})
},
Request: &compassv1beta1.UpsertPatchAssetRequest{
Asset: &compassv1beta1.UpsertPatchAssetRequest_Asset{
Urn: "test dagger",
Type: "table",
Name: wrapperspb.String("new-name"),
Service: "kafka",
Data: &structpb.Struct{},
Owners: []*compassv1beta1.User{{Id: "id", Uuid: "", Email: "email@email.com", Provider: "provider"}},
},
OverwriteLineage: true,
},
ExpectStatus: codes.OK,
PostCheck: func(resp *compassv1beta1.UpsertPatchAssetResponse) error {
expected := &compassv1beta1.UpsertPatchAssetResponse{
Id: assetID,
}
if diff := cmp.Diff(resp, expected, protocmp.Transform()); diff != "" {
return fmt.Errorf("expected response to be %+v, was %+v", expected, resp)
}
return nil

},
},
}
for _, tc := range testCases {
t.Run(tc.Description, func(t *testing.T) {
Expand Down
45 changes: 45 additions & 0 deletions internal/server/v1beta1/mocks/asset_service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions proto/compass.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2278,6 +2278,13 @@ definitions:
type: array
items:
$ref: '#/definitions/LineageNode'
overwrite_lineage:
type: boolean
description: |-
overwrite_lineage determines whether the asset's lineage should be
overwritten with the upstreams and downstreams specified in the request.
Currently, it is only applicable when both upstreams and downstreams are
empty/not specified.
upstreams:
type: array
items:
Expand Down
Loading