Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add /tasks patch; return 202 for task actions and updates. #660

Merged
merged 10 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 0 additions & 28 deletions api/base.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package api

import (
"bytes"
"database/sql"
"encoding/json"
"errors"
Expand Down Expand Up @@ -99,33 +98,6 @@ func (h *BaseHandler) pk(ctx *gin.Context) (id uint) {
return
}

// modBody updates the body using the `mod` function.
// 1. read the body.
// 2. mod()
// 3. write body.
func (h *BaseHandler) modBody(
ctx *gin.Context,
r interface{},
mod func(bool) error) (err error) {
//
withBody := false
if ctx.Request.ContentLength > 0 {
withBody = true
err = h.Bind(ctx, r)
if err != nil {
return
}
}
err = mod(withBody)
if err != nil {
return
}
b, _ := json.Marshal(r)
bfr := bytes.NewBuffer(b)
ctx.Request.Body = io.NopCloser(bfr)
return
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hint: removed in favor of simpler, more direct approach.

// CurrentUser gets username from Keycloak auth token.
func (h *BaseHandler) CurrentUser(ctx *gin.Context) (user string) {
rtx := WithContext(ctx)
Expand Down
69 changes: 31 additions & 38 deletions api/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
)

const (
LocatorParam = "locator"
Submit = "submit"
)

Copy link
Contributor Author

@jortel jortel Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hint: LocatorParam unused. The /tasks now supports filtering.

// TaskHandler handles task routes.
Expand All @@ -55,10 +55,11 @@ func (h TaskHandler) AddRoutes(e *gin.Engine) {
routeGroup.POST(TasksRoot, h.Create)
routeGroup.GET(TaskRoot, h.Get)
routeGroup.PUT(TaskRoot, h.Update)
routeGroup.PATCH(TaskRoot, Transaction, h.Update)
routeGroup.DELETE(TaskRoot, h.Delete)
routeGroup.GET(TasksReportQueueRoot, h.Queued)
// Actions
routeGroup.PUT(TaskSubmitRoot, h.Submit, h.Update)
routeGroup.PUT(TaskSubmitRoot, Transaction, h.Submit)
Copy link
Contributor Author

@jortel jortel Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hint: with complexity delegated to the task manager, simpler to implement submit as a patch and no longer chain submit; state=Ready; update().
BaseHandler.modBody() should probably go away.

routeGroup.PUT(TaskCancelRoot, h.Cancel)
// Bucket
routeGroup = e.Group("/")
Expand Down Expand Up @@ -335,73 +336,66 @@ func (h TaskHandler) Delete(ctx *gin.Context) {
// @description Update a task.
// @tags tasks
// @accept json
// @success 204
// @success 202
// @router /tasks/{id} [put]
// @param id path int true "Task ID"
// @param task body Task true "Task data"
func (h TaskHandler) Update(ctx *gin.Context) {
id := h.pk(ctx)
m := &model.Task{}
err := h.DB(ctx).First(m, id).Error
if err != nil {
_ = ctx.Error(err)
return
}
r := &Task{}
err := h.Bind(ctx, r)
if ctx.Request.Method == http.MethodPatch &&
ctx.Request.ContentLength > 0 {
r.With(m)
}
err = h.Bind(ctx, r)
if err != nil {
_ = ctx.Error(err)
return
}
r.ID = id
if _, found := ctx.Get(Submit); found {
r.State = tasking.Ready
}
m = r.Model()
m.ID = id
m.UpdateUser = h.CurrentUser(ctx)
rtx := WithContext(ctx)
task := &tasking.Task{}
task.With(r.Model())
task.UpdateUser = h.BaseHandler.CurrentUser(ctx)
task.With(m)
err = rtx.TaskManager.Update(h.DB(ctx), task)
if err != nil {
_ = ctx.Error(err)
return
}

h.Status(ctx, http.StatusNoContent)
h.Status(ctx, http.StatusAccepted)
}

// Submit godoc
// @summary Submit a task.
// @description Submit a task.
// @description Patch and submit a task.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hint: Much better for the caller to be able to patch a task when submitting rather than update. This will support submit with minimal or no patch.

// @tags tasks
// @accept json
// @success 204
// @success 202
// @router /tasks/{id}/submit [put]
// @param id path int true "Task ID"
// @param task body Task false "Task data (optional)"
func (h TaskHandler) Submit(ctx *gin.Context) {
id := h.pk(ctx)
r := &Task{}
err := h.findRefs(ctx, r)
if err != nil {
_ = ctx.Error(err)
return
}
mod := func(withBody bool) (err error) {
if !withBody {
m := r.Model()
err = h.DB(ctx).First(m, id).Error
if err != nil {
return
}
r.With(m)
}
r.State = tasking.Ready
return
}
err = h.modBody(ctx, r, mod)
if err != nil {
_ = ctx.Error(err)
return
}
ctx.Next()
ctx.Set(Submit, true)
ctx.Request.Method = http.MethodPatch
h.Update(ctx)
}

// Cancel godoc
// @summary Cancel a task.
// @description Cancel a task.
// @tags tasks
// @success 204
// @success 202
// @router /tasks/{id}/cancel [put]
// @param id path int true "Task ID"
func (h TaskHandler) Cancel(ctx *gin.Context) {
Expand All @@ -413,7 +407,7 @@ func (h TaskHandler) Cancel(ctx *gin.Context) {
return
}

h.Status(ctx, http.StatusNoContent)
h.Status(ctx, http.StatusAccepted)
}

// BucketGet godoc
Expand Down Expand Up @@ -735,7 +729,6 @@ type Task struct {
TTL TTL `json:"ttl,omitempty" yaml:",omitempty"`
Data any `json:"data,omitempty" yaml:",omitempty"`
Application *Ref `json:"application,omitempty" yaml:",omitempty"`
Actions []string `json:"actions,omitempty" yaml:",omitempty"`
Bucket *Ref `json:"bucket,omitempty" yaml:",omitempty"`
Pod string `json:"pod,omitempty" yaml:",omitempty"`
Retries int `json:"retries,omitempty" yaml:",omitempty"`
Expand Down
59 changes: 23 additions & 36 deletions api/taskgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ func (h TaskGroupHandler) AddRoutes(e *gin.Engine) {
routeGroup.GET(TaskGroupsRoot+"/", h.List)
routeGroup.POST(TaskGroupsRoot, h.Create)
routeGroup.PUT(TaskGroupRoot, h.Update)
routeGroup.PATCH(TaskGroupRoot, Transaction, h.Update)
routeGroup.GET(TaskGroupRoot, h.Get)
routeGroup.PUT(TaskGroupSubmitRoot, h.Submit, h.Update)
routeGroup.PUT(TaskGroupSubmitRoot, Transaction, h.Submit)
routeGroup.DELETE(TaskGroupRoot, h.Delete)
// Bucket
routeGroup = e.Group("/")
Expand Down Expand Up @@ -175,18 +176,25 @@ func (h TaskGroupHandler) Create(ctx *gin.Context) {
// @param task body TaskGroup true "Task data"
func (h TaskGroupHandler) Update(ctx *gin.Context) {
id := h.pk(ctx)
updated := &TaskGroup{}
err := h.Bind(ctx, updated)
m := &model.TaskGroup{}
err := h.DB(ctx).First(m, id).Error
if err != nil {
_ = ctx.Error(err)
return
}
current := &model.TaskGroup{}
err = h.DB(ctx).First(current, id).Error
r := &TaskGroup{}
if ctx.Request.Method == http.MethodPatch &&
ctx.Request.ContentLength > 0 {
r.With(m)
}
err = h.Bind(ctx, r)
if err != nil {
_ = ctx.Error(err)
return
}
err = h.findRefs(ctx, updated)
if _, found := ctx.Get(Submit); found {
r.State = tasking.Ready
}
err = h.findRefs(ctx, r)
if err != nil {
_ = ctx.Error(err)
return
Expand All @@ -196,10 +204,10 @@ func (h TaskGroupHandler) Update(ctx *gin.Context) {
clause.Associations,
"BucketID",
"Bucket")
m := updated.Model()
m = r.Model()
m.ID = id
m.UpdateUser = h.BaseHandler.CurrentUser(ctx)
switch updated.State {
m.UpdateUser = h.CurrentUser(ctx)
switch m.State {
case "", tasking.Created:
err = db.Save(m).Error
if err != nil {
Expand Down Expand Up @@ -230,6 +238,7 @@ func (h TaskGroupHandler) Update(ctx *gin.Context) {
for i := range m.Tasks {
task := &tasking.Task{}
task.With(&m.Tasks[i])
task.CreateUser = h.CurrentUser(ctx)
err = rtx.TaskManager.Create(h.DB(ctx), task)
if err != nil {
_ = ctx.Error(err)
Expand Down Expand Up @@ -284,39 +293,17 @@ func (h TaskGroupHandler) Delete(ctx *gin.Context) {

// Submit godoc
// @summary Submit a task group.
// @description Submit a task group.
// @description Patch and submit a task group.
// @tags taskgroups
// @accept json
// @success 204
// @router /taskgroups/{id}/submit [put]
// @param id path int true "TaskGroup ID"
// @param taskgroup body TaskGroup false "TaskGroup data (optional)"
func (h TaskGroupHandler) Submit(ctx *gin.Context) {
id := h.pk(ctx)
r := &TaskGroup{}
err := h.findRefs(ctx, r)
if err != nil {
_ = ctx.Error(err)
return
}
mod := func(withBody bool) (err error) {
if !withBody {
m := r.Model()
err = h.DB(ctx).First(m, id).Error
if err != nil {
return
}
r.With(m)
}
r.State = tasking.Ready
return
}
err = h.modBody(ctx, r, mod)
if err != nil {
_ = ctx.Error(err)
return
}
ctx.Next()
ctx.Set(Submit, true)
ctx.Request.Method = http.MethodPatch
h.Update(ctx)
}

// BucketGet godoc
Expand Down
57 changes: 56 additions & 1 deletion binding/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ func (r *Client) Post(path string, object interface{}) (err error) {
}
status := response.StatusCode
switch status {
case http.StatusAccepted:
case http.StatusNoContent:
case http.StatusOK,
http.StatusCreated:
var body []byte
Expand All @@ -185,7 +187,6 @@ func (r *Client) Post(path string, object interface{}) (err error) {
err = liberr.Wrap(err)
return
}
case http.StatusNoContent:
default:
err = r.restError(response)
}
Expand Down Expand Up @@ -223,6 +224,60 @@ func (r *Client) Put(path string, object interface{}, params ...Param) (err erro
}
status := response.StatusCode
switch status {
case http.StatusAccepted:
case http.StatusNoContent:
case http.StatusOK,
http.StatusCreated:
var body []byte
body, err = io.ReadAll(response.Body)
if err != nil {
err = liberr.Wrap(err)
return
}
err = json.Unmarshal(body, object)
if err != nil {
err = liberr.Wrap(err)
return
}
default:
err = r.restError(response)
}

return
}

// Patch a resource.
func (r *Client) Patch(path string, object interface{}, params ...Param) (err error) {
request := func() (request *http.Request, err error) {
bfr, err := json.Marshal(object)
if err != nil {
err = liberr.Wrap(err)
return
}
reader := bytes.NewReader(bfr)
request = &http.Request{
Header: http.Header{},
Method: http.MethodPatch,
Body: io.NopCloser(reader),
URL: r.join(path),
}
request.Header.Set(api.Accept, binding.MIMEJSON)
if len(params) > 0 {
q := request.URL.Query()
for _, p := range params {
q.Add(p.Key, p.Value)
}
request.URL.RawQuery = q.Encode()
}
return
}
response, err := r.send(request)
if err != nil {
return
}
status := response.StatusCode
switch status {
case http.StatusAccepted:
case http.StatusNoContent:
case http.StatusOK,
http.StatusCreated:
Expand Down
7 changes: 7 additions & 0 deletions binding/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ func (h *Task) Update(r *api.Task) (err error) {
return
}

// Patch a Task.
func (h *Task) Patch(id uint, r any) (err error) {
path := Path(api.TaskRoot).Inject(Params{api.ID: id})
err = h.client.Patch(path, r)
return
}

// Delete a Task.
func (h *Task) Delete(id uint) (err error) {
err = h.client.Delete(Path(api.TaskRoot).Inject(Params{api.ID: id}))
Expand Down
Loading
Loading