Skip to content

Commit

Permalink
feat: return ErrNotFound if results missing
Browse files Browse the repository at this point in the history
  • Loading branch information
Lakshay Kalbhor committed Aug 14, 2023
1 parent 14957be commit 4bc4a43
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 13 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,9 @@ A result is arbitrary `[]byte` data saved by a handler or callback via `JobCtx.S

#### Get Result

```go
If the `jobID` does not exist, `ErrNotFound` will be returned

```go
b, err := srv.GetResult(ctx, jobID)
if err != nil {
log.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion chains.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (s *Server) setChainMessage(ctx context.Context, c ChainMessage) error {
}

func (s *Server) getChainMessage(ctx context.Context, id string) (ChainMessage, error) {
b, err := s.results.Get(ctx, chainPrefix+id)
b, err := s.GetResult(ctx, chainPrefix+id)
if err != nil {
return ChainMessage{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *Server) setGroupMessage(ctx context.Context, g GroupMessage) error {
}

func (s *Server) getGroupMessage(ctx context.Context, id string) (GroupMessage, error) {
b, err := s.results.Get(ctx, groupPrefix+id)
b, err := s.GetResult(ctx, groupPrefix+id)
if err != nil {
return GroupMessage{}, err
}
Expand Down
2 changes: 2 additions & 0 deletions interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

type Results interface {
Get(ctx context.Context, id string) ([]byte, error)
// NilError is used to check internally if the "id" is missing
NilError() error
Set(ctx context.Context, id string, b []byte) error
// DeleteJob removes the job's saved metadata from the store
DeleteJob(ctx context.Context, id string) error
Expand Down
2 changes: 1 addition & 1 deletion jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (s *Server) GetJob(ctx context.Context, id string) (JobMessage, error) {
defer span.End()
}

b, err := s.results.Get(ctx, jobPrefix+id)
b, err := s.GetResult(ctx, jobPrefix+id)
if err != nil {
s.spanError(span, err)
return JobMessage{}, err
Expand Down
5 changes: 3 additions & 2 deletions jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tasqueue
import (
"context"
"encoding/json"
"errors"
"testing"
"time"
)
Expand Down Expand Up @@ -226,8 +227,8 @@ func TestDeleteJob(t *testing.T) {
}

_, err = srv.GetResult(ctx, uuid)
if err.Error() != "value not found" {
t.Fatalf("job results not deleted")
if !errors.Is(err, ErrNotFound) {
t.Fatalf("job results not deleted: %v", err)
}

}
Expand Down
10 changes: 8 additions & 2 deletions results/in-memory/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package inmemory

import (
"context"
"fmt"
"errors"
"sync"
)

Expand All @@ -21,17 +21,23 @@ func New() *Results {
}
}

var errNotFound = errors.New("could not find result in in-memory store")

func (r *Results) Get(ctx context.Context, id string) ([]byte, error) {
r.mu.Lock()
v, ok := r.store[id]
r.mu.Unlock()
if !ok {
return nil, fmt.Errorf("value not found")
return nil, errNotFound
}

return v, nil
}

func (r *Results) NilError() error {
return errNotFound
}

func (r *Results) DeleteJob(ctx context.Context, id string) error {
r.mu.Lock()
delete(r.store, id)
Expand Down
4 changes: 4 additions & 0 deletions results/nats-js/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func (r *Results) Get(_ context.Context, id string) ([]byte, error) {
return rs.Value(), nil
}

func (r *Results) NilError() error {
return nats.ErrKeyNotFound
}

func (r *Results) Set(_ context.Context, id string, b []byte) error {
if _, err := r.conn.Put(resultPrefix+id, b); err != nil {
return err
Expand Down
7 changes: 5 additions & 2 deletions results/redis/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package redis

import (
"context"
"errors"
"time"

"github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -128,9 +127,13 @@ func (r *Results) Get(ctx context.Context, id string) ([]byte, error) {
r.lo.Debug("getting result for job", "id", id)

rs, err := r.conn.Get(ctx, resultPrefix+id).Bytes()
if err != nil && !errors.Is(err, redis.Nil) {
if err != nil {
return nil, err
}

return rs, nil
}

func (r *Results) NilError() error {
return redis.Nil
}
14 changes: 11 additions & 3 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tasqueue

import (
"context"
"errors"
"fmt"
"runtime"
"sync"
Expand Down Expand Up @@ -160,14 +161,21 @@ func (s *Server) GetTasks() []string {
return t
}

var ErrNotFound = errors.New("result not found")

// GetResult() accepts a ID and returns the result of the job in the results store.
func (s *Server) GetResult(ctx context.Context, id string) ([]byte, error) {
b, err := s.results.Get(ctx, id)
if err != nil {
return nil, err
if err == nil {
return b, nil
}

// Check if error is due to key being invalid
if errors.Is(err, s.results.NilError()) {
return nil, ErrNotFound
}

return b, nil
return nil, err
}

// GetPending() returns the pending job message's in the broker's queue.
Expand Down

0 comments on commit 4bc4a43

Please sign in to comment.