diff --git a/README.md b/README.md index bcf98e6..f52ad66 100644 --- a/README.md +++ b/README.md @@ -413,7 +413,9 @@ A result is arbitrary `[]byte` data saved by a handler or callback via `JobCtx.S #### Get Result -```go +If the `jobID` does not exist, `ErrNotFound` will be returned + +```go b, err := srv.GetResult(ctx, jobID) if err != nil { log.Fatal(err) diff --git a/chains.go b/chains.go index 3084b5d..93326f1 100644 --- a/chains.go +++ b/chains.go @@ -143,7 +143,7 @@ func (s *Server) setChainMessage(ctx context.Context, c ChainMessage) error { } func (s *Server) getChainMessage(ctx context.Context, id string) (ChainMessage, error) { - b, err := s.results.Get(ctx, chainPrefix+id) + b, err := s.GetResult(ctx, chainPrefix+id) if err != nil { return ChainMessage{}, err } diff --git a/groups.go b/groups.go index 16a16a3..9c1a8d0 100644 --- a/groups.go +++ b/groups.go @@ -147,7 +147,7 @@ func (s *Server) setGroupMessage(ctx context.Context, g GroupMessage) error { } func (s *Server) getGroupMessage(ctx context.Context, id string) (GroupMessage, error) { - b, err := s.results.Get(ctx, groupPrefix+id) + b, err := s.GetResult(ctx, groupPrefix+id) if err != nil { return GroupMessage{}, err } diff --git a/interfaces.go b/interfaces.go index 4c647f1..8324f23 100644 --- a/interfaces.go +++ b/interfaces.go @@ -7,6 +7,8 @@ import ( type Results interface { Get(ctx context.Context, id string) ([]byte, error) + // NilError is used to check internally if the "id" is missing + NilError() error Set(ctx context.Context, id string, b []byte) error // DeleteJob removes the job's saved metadata from the store DeleteJob(ctx context.Context, id string) error diff --git a/jobs.go b/jobs.go index 969604e..7f1492b 100644 --- a/jobs.go +++ b/jobs.go @@ -256,7 +256,7 @@ func (s *Server) GetJob(ctx context.Context, id string) (JobMessage, error) { defer span.End() } - b, err := s.results.Get(ctx, jobPrefix+id) + b, err := s.GetResult(ctx, jobPrefix+id) if err != nil { s.spanError(span, err) return JobMessage{}, err diff --git a/jobs_test.go b/jobs_test.go index a8128c0..b4ddfef 100644 --- a/jobs_test.go +++ b/jobs_test.go @@ -3,6 +3,7 @@ package tasqueue import ( "context" "encoding/json" + "errors" "testing" "time" ) @@ -226,8 +227,8 @@ func TestDeleteJob(t *testing.T) { } _, err = srv.GetResult(ctx, uuid) - if err.Error() != "value not found" { - t.Fatalf("job results not deleted") + if !errors.Is(err, ErrNotFound) { + t.Fatalf("job results not deleted: %v", err) } } diff --git a/results/in-memory/results.go b/results/in-memory/results.go index 457c016..de376c6 100644 --- a/results/in-memory/results.go +++ b/results/in-memory/results.go @@ -2,7 +2,7 @@ package inmemory import ( "context" - "fmt" + "errors" "sync" ) @@ -21,17 +21,23 @@ func New() *Results { } } +var errNotFound = errors.New("could not find result in in-memory store") + func (r *Results) Get(ctx context.Context, id string) ([]byte, error) { r.mu.Lock() v, ok := r.store[id] r.mu.Unlock() if !ok { - return nil, fmt.Errorf("value not found") + return nil, errNotFound } return v, nil } +func (r *Results) NilError() error { + return errNotFound +} + func (r *Results) DeleteJob(ctx context.Context, id string) error { r.mu.Lock() delete(r.store, id) diff --git a/results/nats-js/results.go b/results/nats-js/results.go index 9607d9b..989745f 100644 --- a/results/nats-js/results.go +++ b/results/nats-js/results.go @@ -66,6 +66,10 @@ func (r *Results) Get(_ context.Context, id string) ([]byte, error) { return rs.Value(), nil } +func (r *Results) NilError() error { + return nats.ErrKeyNotFound +} + func (r *Results) Set(_ context.Context, id string, b []byte) error { if _, err := r.conn.Put(resultPrefix+id, b); err != nil { return err diff --git a/results/redis/results.go b/results/redis/results.go index 9da953b..06a56d1 100644 --- a/results/redis/results.go +++ b/results/redis/results.go @@ -2,7 +2,6 @@ package redis import ( "context" - "errors" "time" "github.com/go-redis/redis/v8" @@ -128,9 +127,13 @@ func (r *Results) Get(ctx context.Context, id string) ([]byte, error) { r.lo.Debug("getting result for job", "id", id) rs, err := r.conn.Get(ctx, resultPrefix+id).Bytes() - if err != nil && !errors.Is(err, redis.Nil) { + if err != nil { return nil, err } return rs, nil } + +func (r *Results) NilError() error { + return redis.Nil +} diff --git a/server.go b/server.go index 7da0b0c..0e45c3a 100644 --- a/server.go +++ b/server.go @@ -2,6 +2,7 @@ package tasqueue import ( "context" + "errors" "fmt" "runtime" "sync" @@ -160,14 +161,21 @@ func (s *Server) GetTasks() []string { return t } +var ErrNotFound = errors.New("result not found") + // GetResult() accepts a ID and returns the result of the job in the results store. func (s *Server) GetResult(ctx context.Context, id string) ([]byte, error) { b, err := s.results.Get(ctx, id) - if err != nil { - return nil, err + if err == nil { + return b, nil + } + + // Check if error is due to key being invalid + if errors.Is(err, s.results.NilError()) { + return nil, ErrNotFound } - return b, nil + return nil, err } // GetPending() returns the pending job message's in the broker's queue.