Skip to content

Commit

Permalink
feat: update api to be user-aware (#70)
Browse files Browse the repository at this point in the history
* feat(api): integrate user-aware middleware

* refactor: rebase to assets
  • Loading branch information
mabdh authored Feb 3, 2022
1 parent 501735c commit 730c813
Show file tree
Hide file tree
Showing 55 changed files with 807 additions and 250 deletions.
2 changes: 1 addition & 1 deletion api/handlers/asset_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (h *AssetHandler) Upsert(w http.ResponseWriter, r *http.Request) {
var ast asset.Asset
err := json.NewDecoder(r.Body).Decode(&ast)
if err != nil {
writeJSONError(w, http.StatusBadRequest, bodyParserErrorMsg(err))
WriteJSONError(w, http.StatusBadRequest, bodyParserErrorMsg(err))
return
}
if err := h.validateAsset(ast); err != nil {
Expand Down
50 changes: 25 additions & 25 deletions api/handlers/asset_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

"github.com/odpf/columbus/api/handlers"
"github.com/odpf/columbus/asset"
"github.com/odpf/columbus/lib/mock"
"github.com/odpf/columbus/lib/mocks"
"github.com/stretchr/testify/assert"
tmock "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestAssetHandlerUpsert(t *testing.T) {

expectedErr := errors.New("unknown error")

ar := new(mock.AssetRepository)
ar := new(mocks.AssetRepository)
ar.On("Upsert", rr.Context(), tmock.AnythingOfType("*asset.Asset")).Return(expectedErr)
defer ar.AssertExpectations(t)

Expand All @@ -108,11 +108,11 @@ func TestAssetHandlerUpsert(t *testing.T) {

expectedErr := errors.New("unknown error")

ar := new(mock.AssetRepository)
ar := new(mocks.AssetRepository)
ar.On("Upsert", rr.Context(), tmock.AnythingOfType("*asset.Asset")).Return(nil)
defer ar.AssertExpectations(t)

dr := new(mock.DiscoveryRepository)
dr := new(mocks.DiscoveryRepository)
dr.On("Upsert", rr.Context(), tmock.AnythingOfType("asset.Asset")).Return(expectedErr)
defer dr.AssertExpectations(t)

Expand Down Expand Up @@ -141,14 +141,14 @@ func TestAssetHandlerUpsert(t *testing.T) {
rr := httptest.NewRequest("PUT", "/", strings.NewReader(validPayload))
rw := httptest.NewRecorder()

ar := new(mock.AssetRepository)
ar := new(mocks.AssetRepository)
ar.On("Upsert", rr.Context(), &ast).Return(nil).Run(func(args tmock.Arguments) {
argAsset := args.Get(1).(*asset.Asset)
argAsset.ID = assetWithID.ID
})
defer ar.AssertExpectations(t)

dr := new(mock.DiscoveryRepository)
dr := new(mocks.DiscoveryRepository)
dr.On("Upsert", rr.Context(), assetWithID).Return(nil)
defer dr.AssertExpectations(t)

Expand All @@ -171,7 +171,7 @@ func TestAssetHandlerDelete(t *testing.T) {
Description string
AssetID string
ExpectStatus int
Setup func(context.Context, *mock.AssetRepository, *mock.DiscoveryRepository)
Setup func(context.Context, *mocks.AssetRepository, *mocks.DiscoveryRepository)
PostCheck func(t *testing.T, tc *testCase, resp *http.Response) error
}

Expand All @@ -180,23 +180,23 @@ func TestAssetHandlerDelete(t *testing.T) {
Description: "should return 404 when asset cannot be found",
AssetID: "id-10",
ExpectStatus: http.StatusNotFound,
Setup: func(ctx context.Context, ar *mock.AssetRepository, dr *mock.DiscoveryRepository) {
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository) {
ar.On("Delete", ctx, "id-10").Return(asset.NotFoundError{AssetID: "id-10"})
},
},
{
Description: "should return 500 on error deleting asset",
AssetID: "id-10",
ExpectStatus: http.StatusInternalServerError,
Setup: func(ctx context.Context, ar *mock.AssetRepository, dr *mock.DiscoveryRepository) {
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository) {
ar.On("Delete", ctx, "id-10").Return(errors.New("error deleting asset"))
},
},
{
Description: "should return 500 on error deleting asset from discovery",
AssetID: "id-10",
ExpectStatus: http.StatusInternalServerError,
Setup: func(ctx context.Context, ar *mock.AssetRepository, dr *mock.DiscoveryRepository) {
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository) {
ar.On("Delete", ctx, "id-10").Return(nil)
dr.On("Delete", ctx, "id-10").Return(asset.NotFoundError{AssetID: "id-10"})
},
Expand All @@ -205,7 +205,7 @@ func TestAssetHandlerDelete(t *testing.T) {
Description: "should return 204 on success",
AssetID: "id-10",
ExpectStatus: http.StatusNoContent,
Setup: func(ctx context.Context, ar *mock.AssetRepository, dr *mock.DiscoveryRepository) {
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository) {
ar.On("Delete", ctx, "id-10").Return(nil)
dr.On("Delete", ctx, "id-10").Return(nil)
},
Expand All @@ -219,8 +219,8 @@ func TestAssetHandlerDelete(t *testing.T) {
"id": tc.AssetID,
})

ar := new(mock.AssetRepository)
dr := new(mock.DiscoveryRepository)
ar := new(mocks.AssetRepository)
dr := new(mocks.DiscoveryRepository)
tc.Setup(rr.Context(), ar, dr)
defer ar.AssertExpectations(t)

Expand All @@ -246,29 +246,29 @@ func TestAssetHandlerGetByID(t *testing.T) {
type testCase struct {
Description string
ExpectStatus int
Setup func(context.Context, *mock.AssetRepository)
Setup func(context.Context, *mocks.AssetRepository)
PostCheck func(resp *http.Response) error
}

var testCases = []testCase{
{
Description: `should return http 404 if asset doesn't exist`,
ExpectStatus: http.StatusNotFound,
Setup: func(ctx context.Context, ar *mock.AssetRepository) {
Setup: func(ctx context.Context, ar *mocks.AssetRepository) {
ar.On("GetByID", ctx, assetID).Return(asset.Asset{}, asset.NotFoundError{AssetID: assetID})
},
},
{
Description: `should return http 500 if fetching fails`,
ExpectStatus: http.StatusInternalServerError,
Setup: func(ctx context.Context, ar *mock.AssetRepository) {
Setup: func(ctx context.Context, ar *mocks.AssetRepository) {
ar.On("GetByID", ctx, assetID).Return(asset.Asset{}, errors.New("unknown error"))
},
},
{
Description: "should return http 200 status along with the asset, if found",
ExpectStatus: http.StatusOK,
Setup: func(ctx context.Context, ar *mock.AssetRepository) {
Setup: func(ctx context.Context, ar *mocks.AssetRepository) {
ar.On("GetByID", ctx, assetID).Return(ast, nil)
},
PostCheck: func(r *http.Response) error {
Expand All @@ -291,7 +291,7 @@ func TestAssetHandlerGetByID(t *testing.T) {
rr = mux.SetURLVars(rr, map[string]string{
"id": assetID,
})
ar := new(mock.AssetRepository)
ar := new(mocks.AssetRepository)
tc.Setup(rr.Context(), ar)

handler := handlers.NewAssetHandler(logger, ar, nil)
Expand All @@ -316,23 +316,23 @@ func TestAssetHandlerGet(t *testing.T) {
Description string
Querystring string
ExpectStatus int
Setup func(context.Context, *mock.AssetRepository)
Setup func(context.Context, *mocks.AssetRepository)
PostCheck func(resp *http.Response) error
}

var testCases = []testCase{
{
Description: `should return http 500 if fetching fails`,
ExpectStatus: http.StatusInternalServerError,
Setup: func(ctx context.Context, ar *mock.AssetRepository) {
Setup: func(ctx context.Context, ar *mocks.AssetRepository) {
ar.On("Get", ctx, asset.Config{}).Return([]asset.Asset{}, errors.New("unknown error"))
},
},
{
Description: `should return http 500 if fetching total fails`,
Querystring: "?with_total=1",
ExpectStatus: http.StatusInternalServerError,
Setup: func(ctx context.Context, ar *mock.AssetRepository) {
Setup: func(ctx context.Context, ar *mocks.AssetRepository) {
ar.On("Get", ctx, asset.Config{}).Return([]asset.Asset{}, nil)
ar.On("GetCount", ctx, asset.Config{}).Return(0, errors.New("unknown error"))
},
Expand All @@ -341,7 +341,7 @@ func TestAssetHandlerGet(t *testing.T) {
Description: `should parse querystring to get config`,
Querystring: "?text=asd&type=table&service=bigquery&size=30&offset=50",
ExpectStatus: http.StatusOK,
Setup: func(ctx context.Context, ar *mock.AssetRepository) {
Setup: func(ctx context.Context, ar *mocks.AssetRepository) {
ar.On("Get", ctx, asset.Config{
Text: "asd",
Type: "table",
Expand All @@ -354,7 +354,7 @@ func TestAssetHandlerGet(t *testing.T) {
{
Description: "should return http 200 status along with list of assets",
ExpectStatus: http.StatusOK,
Setup: func(ctx context.Context, ar *mock.AssetRepository) {
Setup: func(ctx context.Context, ar *mocks.AssetRepository) {
ar.On("Get", ctx, asset.Config{}).Return([]asset.Asset{
{ID: "testid-1"},
{ID: "testid-2"},
Expand Down Expand Up @@ -386,7 +386,7 @@ func TestAssetHandlerGet(t *testing.T) {
Description: "should return total in the payload if with_total flag is given",
ExpectStatus: http.StatusOK,
Querystring: "?with_total=true&text=dsa&type=job&service=kafka&size=10&offset=5",
Setup: func(ctx context.Context, ar *mock.AssetRepository) {
Setup: func(ctx context.Context, ar *mocks.AssetRepository) {
ar.On("Get", ctx, asset.Config{
Text: "dsa",
Type: "job",
Expand Down Expand Up @@ -435,7 +435,7 @@ func TestAssetHandlerGet(t *testing.T) {
rr := httptest.NewRequest("GET", "/"+tc.Querystring, nil)
rw := httptest.NewRecorder()

ar := new(mock.AssetRepository)
ar := new(mocks.AssetRepository)
tc.Setup(rr.Context(), ar)

handler := handlers.NewAssetHandler(logger, ar, nil)
Expand Down
4 changes: 2 additions & 2 deletions api/handlers/default_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
)

func NotFound(w http.ResponseWriter, r *http.Request) {
writeJSONError(w, http.StatusNotFound, mux.ErrNotFound.Error())
WriteJSONError(w, http.StatusNotFound, mux.ErrNotFound.Error())
}

func MethodNotAllowed(w http.ResponseWriter, r *http.Request) {
writeJSONError(w, http.StatusMethodNotAllowed, mux.ErrMethodMismatch.Error())
WriteJSONError(w, http.StatusMethodNotAllowed, mux.ErrMethodMismatch.Error())
}
2 changes: 1 addition & 1 deletion api/handlers/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func internalServerError(w http.ResponseWriter, logger log.Logger, msg string) {
writeJSON(w, http.StatusInternalServerError, response)
}

func writeJSONError(w http.ResponseWriter, status int, msg string) {
func WriteJSONError(w http.ResponseWriter, status int, msg string) {
response := &ErrorResponse{
Reason: msg,
}
Expand Down
4 changes: 2 additions & 2 deletions api/handlers/lineage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (handler *LineageHandler) GetLineage(w http.ResponseWriter, r *http.Request
if err != nil {
handler.logger.Error("error requesting graph", "error", err)
status := http.StatusInternalServerError
writeJSONError(w, status, http.StatusText(status))
WriteJSONError(w, status, http.StatusText(status))
return
}
requestParams := mux.Vars(r)
Expand All @@ -48,7 +48,7 @@ func (handler *LineageHandler) GetLineage(w http.ResponseWriter, r *http.Request
if err != nil {
handler.logger.Error("error querying graph", "query", opts, "error", err)
status := http.StatusBadRequest
writeJSONError(w, status, err.Error())
WriteJSONError(w, status, err.Error())
return
}

Expand Down
6 changes: 3 additions & 3 deletions api/handlers/lineage_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/gorilla/mux"
"github.com/odpf/columbus/api/handlers"
"github.com/odpf/columbus/lib/mock"
"github.com/odpf/columbus/lib/mocks"
"github.com/odpf/columbus/lib/set"
"github.com/odpf/columbus/lineage"
)
Expand All @@ -38,10 +38,10 @@ func TestLineageHandler(t *testing.T) {
},
}

graph := new(mock.Graph)
graph := new(mocks.Graph)
graph.On("Query", lineage.QueryCfg{Root: "asset-A"}).Return(subGraph, nil)

lp := new(mock.LineageProvider)
lp := new(mocks.LineageProvider)
lp.On("Graph").Return(graph, nil)

handler := handlers.NewLineageHandler(logger, lp)
Expand Down
24 changes: 12 additions & 12 deletions api/handlers/record_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (h *RecordHandler) Delete(w http.ResponseWriter, r *http.Request) {

typName := asset.Type(typeName)
if !typName.IsValid() {
writeJSONError(w, http.StatusNotFound, "type is invalid")
WriteJSONError(w, http.StatusNotFound, "type is invalid")
return
}

Expand All @@ -62,7 +62,7 @@ func (h *RecordHandler) Delete(w http.ResponseWriter, r *http.Request) {
errMessage = err.Error()
}

writeJSONError(w, statusCode, errMessage)
WriteJSONError(w, statusCode, errMessage)
return
}

Expand All @@ -76,13 +76,13 @@ func (h *RecordHandler) UpsertBulk(w http.ResponseWriter, r *http.Request) {
var assets []asset.Asset
err := json.NewDecoder(r.Body).Decode(&assets)
if err != nil {
writeJSONError(w, http.StatusBadRequest, bodyParserErrorMsg(err))
WriteJSONError(w, http.StatusBadRequest, bodyParserErrorMsg(err))
return
}

typName := asset.Type(typeName)
if !typName.IsValid() {
writeJSONError(w, http.StatusNotFound, "type is invalid")
WriteJSONError(w, http.StatusNotFound, "type is invalid")
return
}

Expand All @@ -101,7 +101,7 @@ func (h *RecordHandler) UpsertBulk(w http.ResponseWriter, r *http.Request) {
if err := h.discoveryService.Upsert(r.Context(), typName.String(), assets); err != nil {
h.logger.Error("error creating/updating assets", "type", typName, "error", err)
status := http.StatusInternalServerError
writeJSONError(w, status, http.StatusText(status))
WriteJSONError(w, status, http.StatusText(status))
return
}
h.logger.Info("created/updated assets", "record count", len(assets), "type", typName)
Expand All @@ -113,28 +113,28 @@ func (h *RecordHandler) GetByType(w http.ResponseWriter, r *http.Request) {

typName := asset.Type(typeName)
if !typName.IsValid() {
writeJSONError(w, http.StatusNotFound, "type is invalid")
WriteJSONError(w, http.StatusNotFound, "type is invalid")
return
}

recordRepo, err := h.recordRepositoryFactory.For(typName.String())
if err != nil {
h.logger.Error("error constructing record repository", "type", typName, "error", err)
status, message := h.responseStatusForError(err)
writeJSONError(w, status, message)
WriteJSONError(w, status, message)
return
}
getCfg, err := h.buildGetConfig(r.URL.Query())
if err != nil {
writeJSONError(w, http.StatusBadRequest, err.Error())
WriteJSONError(w, http.StatusBadRequest, err.Error())
return
}

recordList, err := recordRepo.GetAll(r.Context(), getCfg)
if err != nil {
h.logger.Error("error fetching assets: GetAll", "type", typName, "error", err)
status, message := h.responseStatusForError(err)
writeJSONError(w, status, message)
WriteJSONError(w, status, message)
return
}

Expand All @@ -154,23 +154,23 @@ func (h *RecordHandler) GetOneByType(w http.ResponseWriter, r *http.Request) {

typName := asset.Type(typeName)
if !typName.IsValid() {
writeJSONError(w, http.StatusNotFound, "type is invalid")
WriteJSONError(w, http.StatusNotFound, "type is invalid")
return
}

recordRepo, err := h.recordRepositoryFactory.For(typName.String())
if err != nil {
h.logger.Error("internal: error construing record repository", "type", typName, "error", err)
status := http.StatusInternalServerError
writeJSONError(w, status, http.StatusText(status))
WriteJSONError(w, status, http.StatusText(status))
return
}

record, err := recordRepo.GetByID(r.Context(), recordID)
if err != nil {
h.logger.Error("error fetching record", "type", typName, "record id", recordID, "error", err)
status, message := h.responseStatusForError(err)
writeJSONError(w, status, message)
WriteJSONError(w, status, message)
return
}
writeJSON(w, http.StatusOK, record)
Expand Down
Loading

0 comments on commit 730c813

Please sign in to comment.