Skip to content

Commit

Permalink
refactor(types): clean up dynamic types (#51)
Browse files Browse the repository at this point in the history
* feat: add explicit supported type names and es migration

* refactor: remove type creation API and update swagger

* refactor(types): add String method to type name and classification

* refactor(types): remove type classification

* fix(cmd): update error fmt to error wrap in migrate.go

* refactor(types): simplify the use of type struct to typename

* refactor(types): remove API GET /types/{name}

* refactor(types): removing meta index

* refactor(types): merge GetAll and GetRecordsCount in type repository

* refactor(types): check isValid types in GetByName type repository

* refactor(types): remove all functions in type repository except get all
  • Loading branch information
mabdh authored Jan 11, 2022
1 parent 48a51b7 commit 8111b08
Show file tree
Hide file tree
Showing 27 changed files with 448 additions and 1,393 deletions.
63 changes: 23 additions & 40 deletions api/handlers/record_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package handlers

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
Expand Down Expand Up @@ -47,20 +46,16 @@ func (h *RecordHandler) Delete(w http.ResponseWriter, r *http.Request) {
errMessage = fmt.Sprintf("error deleting record \"%s\" with type \"%s\"", recordID, typeName)
)

t, err := h.typeRepository.GetByName(r.Context(), typeName)
if errors.As(err, new(record.ErrNoSuchType)) {
writeJSONError(w, http.StatusNotFound, bodyParserErrorMsg(err))
return
}
if err != nil {
writeJSONError(w, http.StatusBadRequest, bodyParserErrorMsg(err))
typName := record.TypeName(typeName)
if err := typName.IsValid(); err != nil {
writeJSONError(w, http.StatusNotFound, err.Error())
return
}

err = h.discoveryService.DeleteRecord(r.Context(), t.Name, recordID)
err := h.discoveryService.DeleteRecord(r.Context(), typName.String(), recordID)
if err != nil {
h.logger.
Errorf("error deleting record \"%s\": %v", t.Name, err)
Errorf("error deleting record \"%s\": %v", typName, err)

if _, ok := err.(record.ErrNoSuchRecord); ok {
statusCode = http.StatusNotFound
Expand All @@ -71,7 +66,7 @@ func (h *RecordHandler) Delete(w http.ResponseWriter, r *http.Request) {
return
}

h.logger.Infof("deleted record \"%s\" with type \"%s\"", recordID, t.Name)
h.logger.Infof("deleted record \"%s\" with type \"%s\"", recordID, typName)
writeJSON(w, http.StatusNoContent, "success")
}

Expand All @@ -85,20 +80,16 @@ func (h *RecordHandler) UpsertBulk(w http.ResponseWriter, r *http.Request) {
return
}

t, err := h.typeRepository.GetByName(r.Context(), typeName)
if errors.As(err, new(record.ErrNoSuchType)) {
typName := record.TypeName(typeName)
if err := typName.IsValid(); err != nil {
writeJSONError(w, http.StatusNotFound, err.Error())
return
}
if err != nil {
writeJSONError(w, http.StatusBadRequest, bodyParserErrorMsg(err))
return
}

var failedRecords = make(map[int]string)
for idx, record := range records {
if err := h.validateRecord(record); err != nil {
h.logger.WithField("type", t.Name).
h.logger.WithField("type", typName).
WithField("record", record).
Errorf("error validating record: %v", err)
failedRecords[idx] = err.Error()
Expand All @@ -109,34 +100,30 @@ func (h *RecordHandler) UpsertBulk(w http.ResponseWriter, r *http.Request) {
return
}

if err := h.discoveryService.Upsert(r.Context(), t.Name, records); err != nil {
h.logger.WithField("type", t.Name).
if err := h.discoveryService.Upsert(r.Context(), typName.String(), records); err != nil {
h.logger.WithField("type", typName).
Errorf("error creating/updating records: %v", err)

status := http.StatusInternalServerError
writeJSONError(w, status, http.StatusText(status))
return
}
h.logger.Infof("created/updated %d records for %q type", len(records), t.Name)
h.logger.Infof("created/updated %d records for %q type", len(records), typName)
writeJSON(w, http.StatusOK, StatusResponse{Status: "success"})
}

func (h *RecordHandler) GetByType(w http.ResponseWriter, r *http.Request) {
typeName := mux.Vars(r)["name"]

t, err := h.typeRepository.GetByName(r.Context(), typeName)
if errors.As(err, new(record.ErrNoSuchType)) {
typName := record.TypeName(typeName)
if err := typName.IsValid(); err != nil {
writeJSONError(w, http.StatusNotFound, err.Error())
return
}
if err != nil {
writeJSONError(w, http.StatusBadRequest, bodyParserErrorMsg(err))
return
}

recordRepo, err := h.recordRepositoryFactory.For(t.Name)
recordRepo, err := h.recordRepositoryFactory.For(typName.String())
if err != nil {
h.logger.WithField("type", t.Name).
h.logger.WithField("type", typName).
Errorf("error constructing record repository: %v", err)
status, message := h.responseStatusForError(err)
writeJSONError(w, status, message)
Expand All @@ -150,7 +137,7 @@ func (h *RecordHandler) GetByType(w http.ResponseWriter, r *http.Request) {

recordList, err := recordRepo.GetAll(r.Context(), getCfg)
if err != nil {
h.logger.WithField("type", t.Name).
h.logger.WithField("type", typName).
Errorf("error fetching records: GetAll: %v", err)
status, message := h.responseStatusForError(err)
writeJSONError(w, status, message)
Expand All @@ -171,19 +158,15 @@ func (h *RecordHandler) GetOneByType(w http.ResponseWriter, r *http.Request) {
recordID = vars["id"]
)

t, err := h.typeRepository.GetByName(r.Context(), typeName)
if errors.As(err, new(record.ErrNoSuchType)) {
writeJSONError(w, http.StatusNotFound, bodyParserErrorMsg(err))
return
}
if err != nil {
writeJSONError(w, http.StatusBadRequest, bodyParserErrorMsg(err))
typName := record.TypeName(typeName)
if err := typName.IsValid(); err != nil {
writeJSONError(w, http.StatusNotFound, err.Error())
return
}

recordRepo, err := h.recordRepositoryFactory.For(t.Name)
recordRepo, err := h.recordRepositoryFactory.For(typName.String())
if err != nil {
h.logger.WithField("type", t.Name).
h.logger.WithField("type", typName).
Errorf("internal: error construing record repository: %v", err)

status := http.StatusInternalServerError
Expand All @@ -193,7 +176,7 @@ func (h *RecordHandler) GetOneByType(w http.ResponseWriter, r *http.Request) {

record, err := recordRepo.GetByID(r.Context(), recordID)
if err != nil {
h.logger.WithField("type", t.Name).
h.logger.WithField("type", typName).
WithField("record", recordID).
Errorf("error fetching record: %v", err)

Expand Down
5 changes: 2 additions & 3 deletions api/handlers/record_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@ import (
func TestRecordHandler(t *testing.T) {
var (
ctx = tmock.AnythingOfType("*context.valueCtx")
typeName = "existing-type"
typeName = record.TypeNameTable.String()
)

tr := new(mock.TypeRepository)
tr.On("GetByName", ctx, typeName).Return(record.Type{Name: typeName}, nil)
tr.On("GetByName", ctx, "invalid").Return(record.Type{}, record.ErrNoSuchType{TypeName: "invalid"})

t.Run("UpsertBulk", func(t *testing.T) {
var validPayload = `[{"urn": "test dagger", "name": "de-dagger-test", "service": "kafka", "data": {}}]`
Expand Down Expand Up @@ -297,6 +295,7 @@ func TestRecordHandler(t *testing.T) {
})
}
})

t.Run("GetByType", func(t *testing.T) {
type testCase struct {
Description string
Expand Down
156 changes: 5 additions & 151 deletions api/handlers/type_handler.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,14 @@
package handlers

import (
"encoding/json"
"fmt"
"net/http"
"reflect"
"strings"

"github.com/gorilla/mux"
"github.com/odpf/columbus/record"
"github.com/sirupsen/logrus"
)

var (
validClassifications map[record.TypeClassification]int
validClassificationsList string
)

func init() {
validClassifications = make(map[record.TypeClassification]int)
clsList := make([]string, len(record.AllTypeClassifications))
for idx, cls := range record.AllTypeClassifications {
validClassifications[cls] = 0
clsList[idx] = string(cls)
}
validClassificationsList = strings.Join(clsList, ",")
}

// TypeHandler exposes a REST interface to types
type TypeHandler struct {
typeRepo record.TypeRepository
Expand All @@ -43,122 +25,28 @@ func NewTypeHandler(log logrus.FieldLogger, er record.TypeRepository) *TypeHandl
}

func (h *TypeHandler) Get(w http.ResponseWriter, r *http.Request) {
types, err := h.typeRepo.GetAll(r.Context())
typesNameMap, err := h.typeRepo.GetAll(r.Context())
if err != nil {
internalServerError(w, h.log, "error fetching types")
return
}

counts, err := h.typeRepo.GetRecordsCount(r.Context())
if err != nil {
internalServerError(w, h.log, "error fetching records counts")
return
}

type TypeWithCount struct {
record.Type
Count int `json:"count"`
Name string `json:"name"`
Count int `json:"count"`
}

results := []TypeWithCount{}
for _, typ := range types {
count, _ := counts[typ.Name]

for typName, count := range typesNameMap {
results = append(results, TypeWithCount{
Type: typ,
Name: typName.String(),
Count: count,
})
}

writeJSON(w, http.StatusOK, results)
}

func (h *TypeHandler) Find(w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
recordType, err := h.typeRepo.GetByName(r.Context(), name)
if err != nil {
h.log.
Errorf("error fetching type \"%s\": %v", name, err)

var status int
var msg string
if _, ok := err.(record.ErrNoSuchType); ok {
status = http.StatusNotFound
msg = err.Error()
} else {
status = http.StatusInternalServerError
msg = fmt.Sprintf("error fetching type \"%s\"", name)
}

writeJSONError(w, status, msg)
return
}

writeJSON(w, http.StatusOK, recordType)
}

func (h *TypeHandler) Upsert(w http.ResponseWriter, r *http.Request) {
var payload record.Type
err := json.NewDecoder(r.Body).Decode(&payload)
if err != nil {
writeJSONError(w, http.StatusBadRequest, bodyParserErrorMsg(err))
return
}

payload = payload.Normalise()
if err := h.validateType(payload); err != nil {
writeJSONError(w, http.StatusBadRequest, err.Error())
return
}

err = h.typeRepo.CreateOrReplace(r.Context(), payload)
if err != nil {
h.log.
WithField("type", payload.Name).
Errorf("error creating/replacing type: %v", err)

var status int
var msg string
if _, ok := err.(record.ErrReservedTypeName); ok {
status = http.StatusUnprocessableEntity
msg = err.Error()
} else {
status = http.StatusInternalServerError
msg = fmt.Sprintf("error creating type: %v", err)
}

writeJSONError(w, status, msg)
return
}
h.log.Infof("created/updated %q type", payload.Name)
writeJSON(w, http.StatusCreated, payload)
}

func (h *TypeHandler) Delete(w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
err := h.typeRepo.Delete(r.Context(), name)
if err != nil {
h.log.
Errorf("error deleting type \"%s\": %v", name, err)

var status int
var msg string
if _, ok := err.(record.ErrReservedTypeName); ok {
status = http.StatusUnprocessableEntity
msg = err.Error()
} else {
status = http.StatusInternalServerError
msg = fmt.Sprintf("error deleting type \"%s\"", name)
}

writeJSONError(w, status, msg)
return
}

h.log.Infof("deleted type \"%s\"", name)
writeJSON(w, http.StatusNoContent, "success")
}

func (h *TypeHandler) parseSelectQuery(raw string) (fields []string) {
tokens := strings.Split(raw, ",")
for _, token := range tokens {
Expand Down Expand Up @@ -204,44 +92,10 @@ func (h *TypeHandler) validateRecord(record record.Record) error {
return nil
}

func (h *TypeHandler) validateType(e record.Type) error {
// TODO(Aman): write a generic zero-value validator that uses reflection
// TODO(Aman): how about moving this validation to the repository?
// TODO(Aman): use reflection to compute the key namespace for recordType.Fields
// TODO(Aman): add validation for recordType.Lineage
trim := strings.TrimSpace
switch {
case trim(e.Name) == "":
return fmt.Errorf("'name' is required")
case trim(string(e.Classification)) == "":
return fmt.Errorf("'classification' is required")
case isClassificationValid(e.Classification) == false:
return fmt.Errorf("'classification' must be one of [%s]", validClassificationsList)
}
return nil
}

func (h *TypeHandler) responseStatusForError(err error) (int, string) {
switch err.(type) {
case record.ErrNoSuchType, record.ErrNoSuchRecord:
return http.StatusNotFound, err.Error()
}
return http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError)
}

func isClassificationValid(cls record.TypeClassification) bool {
_, valid := validClassifications[cls]
return valid
}

func getJSONKeyNameForField(structure interface{}, field string) string {
structType := reflect.TypeOf(structure)
structField, exists := structType.FieldByName(field)
if !exists {
msg := fmt.Sprintf("no such Field %q in %q", field, structType.Name())
panic(msg)
}
tag := structField.Tag.Get("json")
items := strings.Split(tag, ",")
return strings.TrimSpace(items[0])
}
Loading

0 comments on commit 8111b08

Please sign in to comment.