diff --git a/examples/redis/main.go b/examples/redis/main.go index 4238eb7..a6d5b38 100644 --- a/examples/redis/main.go +++ b/examples/redis/main.go @@ -26,9 +26,10 @@ func main() { DB: 0, }, lo), Results: rr.New(rr.Options{ - Addrs: []string{"127.0.0.1:6379"}, - Password: "", - DB: 0, + Addrs: []string{"127.0.0.1:6379"}, + Password: "", + DB: 0, + MetaExpiry: time.Second * 5, }, lo), Logger: lo, }) @@ -41,28 +42,33 @@ func main() { log.Fatal(err) } - var group []tasqueue.Job + go srv.Start(ctx) - for i := 0; i < 3; i++ { - b, _ := json.Marshal(tasks.SumPayload{Arg1: i, Arg2: 4}) - task, err := tasqueue.NewJob("add", b, tasqueue.JobOpts{}) - if err != nil { - log.Fatal(err) - } - group = append(group, task) + b, _ := json.Marshal(tasks.SumPayload{Arg1: 5, Arg2: 4}) + task, err := tasqueue.NewJob("add", b, tasqueue.JobOpts{}) + if err != nil { + log.Fatal(err) } + srv.Enqueue(ctx, task) - t, _ := tasqueue.NewGroup(group, tasqueue.GroupOpts{}) - x, _ := srv.EnqueueGroup(ctx, t) - go func() { - for { - select { - case <-time.Tick(time.Second * 1): - fmt.Println(srv.GetGroup(ctx, x)) + b, _ = json.Marshal(tasks.SumPayload{Arg1: 5, Arg2: 4}) + task, err = tasqueue.NewJob("add", b, tasqueue.JobOpts{}) + if err != nil { + log.Fatal(err) + } + srv.Enqueue(ctx, task) + fmt.Println("exit..") + for { + select { + case <-time.NewTicker(time.Second * 1).C: + ids, err := srv.GetSuccess(ctx) + if err != nil { + log.Fatal(err) } + + log.Println(ids) } - }() - srv.Start(ctx) + } // Create a task payload. fmt.Println("exit..") diff --git a/go.mod b/go.mod index 0ccd22e..bc2899e 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/go-redis/redis v6.15.9+incompatible - github.com/go-redis/redis/v8 v8.11.5 + github.com/go-redis/redis/v8 v8.11.5 github.com/google/uuid v1.3.0 github.com/nats-io/nats.go v1.15.0 github.com/robfig/cron/v3 v3.0.1 diff --git a/groups.go b/groups.go index 9c1a8d0..c85b790 100644 --- a/groups.go +++ b/groups.go @@ -82,7 +82,7 @@ func (s *Server) EnqueueGroup(ctx context.Context, t Group) (string, error) { func (s *Server) GetGroup(ctx context.Context, id string) (GroupMessage, error) { g, err := s.getGroupMessage(ctx, id) if err != nil { - return g, nil + return g, err } // If the group status is either "done" or "failed". // Do an early return diff --git a/jobs_test.go b/jobs_test.go index b4ddfef..04410cd 100644 --- a/jobs_test.go +++ b/jobs_test.go @@ -163,7 +163,7 @@ func TestSaveJob(t *testing.T) { } // Wait for task to be consumed & processed. - time.Sleep(time.Second) + time.Sleep(time.Second * 2) results, err := srv.GetResult(ctx, uuid) if err != nil { @@ -219,7 +219,7 @@ func TestDeleteJob(t *testing.T) { } if string(results) != savedData { - t.Fatalf("saved results don't match results fetched.\nsaved:%v\nfetched:%v", savedData, results) + t.Fatalf("saved results don't match results fetched.\nsaved:%v\nfetched:%v", savedData, string(results)) } if err := srv.DeleteJob(ctx, uuid); err != nil { diff --git a/results/redis/results.go b/results/redis/results.go index 06a56d1..99cfc7c 100644 --- a/results/redis/results.go +++ b/results/redis/results.go @@ -2,6 +2,7 @@ package redis import ( "context" + "strconv" "time" "github.com/go-redis/redis/v8" @@ -9,8 +10,7 @@ import ( ) const ( - defaultExpiry = 0 - resultPrefix = "tasqueue:results:" + resultPrefix = "tq:res:" // Suffix for hashmaps storing success/failed job ids success = "success" @@ -31,6 +31,8 @@ type Options struct { ReadTimeout time.Duration WriteTimeout time.Duration IdleTimeout time.Duration + Expiry time.Duration + MetaExpiry time.Duration MinIdleConns int } @@ -43,7 +45,7 @@ func DefaultRedis() Options { } func New(o Options, lo logf.Logger) *Results { - return &Results{ + rs := &Results{ opt: o, conn: redis.NewUniversalClient( &redis.UniversalOptions{ @@ -59,16 +61,13 @@ func New(o Options, lo logf.Logger) *Results { ), lo: lo, } -} -func (r *Results) GetSuccess(ctx context.Context) ([]string, error) { - r.lo.Debug("getting successful jobs") - rs, err := r.conn.LRange(ctx, resultPrefix+success, 0, -1).Result() - if err != nil { - return nil, err + // TODO: pass ctx here somehow + if o.MetaExpiry != 0 { + go rs.expireMeta(o.MetaExpiry) } - return rs, nil + return rs } func (r *Results) DeleteJob(ctx context.Context, id string) error { @@ -88,9 +87,13 @@ func (r *Results) DeleteJob(ctx context.Context, id string) error { return nil } -func (r *Results) GetFailed(ctx context.Context) ([]string, error) { - r.lo.Debug("getting failed jobs") - rs, err := r.conn.LRange(ctx, resultPrefix+failed, 0, -1).Result() +func (r *Results) GetSuccess(ctx context.Context) ([]string, error) { + // Fetch the failed tasks with score less than current time + r.lo.Debug("getting successful jobs") + rs, err := r.conn.ZRevRangeByScore(ctx, resultPrefix+success, &redis.ZRangeBy{ + Min: "0", + Max: strconv.FormatInt(time.Now().UnixNano(), 10), + }).Result() if err != nil { return nil, err } @@ -98,29 +101,39 @@ func (r *Results) GetFailed(ctx context.Context) ([]string, error) { return rs, nil } -func (r *Results) SetSuccess(ctx context.Context, id string) error { - r.lo.Debug("setting job as successful") - _, err := r.conn.RPush(ctx, resultPrefix+success, id).Result() +func (r *Results) GetFailed(ctx context.Context) ([]string, error) { + // Fetch the failed tasks with score less than current time + r.lo.Debug("getting failed jobs") + rs, err := r.conn.ZRevRangeByScore(ctx, resultPrefix+failed, &redis.ZRangeBy{ + Min: "0", + Max: strconv.FormatInt(time.Now().UnixNano(), 10), + }).Result() if err != nil { - return err + return nil, err } - return nil + return rs, nil } -func (r *Results) SetFailed(ctx context.Context, id string) error { - r.lo.Debug("setting job as failed") - _, err := r.conn.RPush(ctx, resultPrefix+failed, id).Result() - if err != nil { - return err - } +func (r *Results) SetSuccess(ctx context.Context, id string) error { + r.lo.Debug("setting job as successful", "id", id) + return r.conn.ZAdd(ctx, resultPrefix+success, &redis.Z{ + Score: float64(time.Now().UnixNano()), + Member: id, + }).Err() +} - return nil +func (r *Results) SetFailed(ctx context.Context, id string) error { + r.lo.Debug("setting job as failed", "id", id) + return r.conn.ZAdd(ctx, resultPrefix+failed, &redis.Z{ + Score: float64(time.Now().UnixNano()), + Member: id, + }).Err() } func (r *Results) Set(ctx context.Context, id string, b []byte) error { r.lo.Debug("setting result for job", "id", id) - return r.conn.Set(ctx, resultPrefix+id, b, defaultExpiry).Err() + return r.conn.Set(ctx, resultPrefix+id, b, r.opt.Expiry).Err() } func (r *Results) Get(ctx context.Context, id string) ([]byte, error) { @@ -134,6 +147,36 @@ func (r *Results) Get(ctx context.Context, id string) ([]byte, error) { return rs, nil } +// TODO: accpet a ctx here and shutdown gracefully +func (r *Results) expireMeta(ttl time.Duration) { + r.lo.Info("starting results meta purger", "ttl", ttl) + + var ( + tk = time.NewTicker(ttl) + ) + + for { + select { + // case <-ctx.Done(): + // r.lo.Info("shutting down meta purger", "ttl", ttl) + // return + case <-tk.C: + now := time.Now().UnixNano() - int64(ttl) + score := strconv.FormatInt(now, 10) + + r.lo.Debug("purging failed results metadata", "score", score) + if err := r.conn.ZRemRangeByScore(context.Background(), resultPrefix+failed, "0", score).Err(); err != nil { + r.lo.Error("could not expire success/failed metadata", "err", err) + } + + r.lo.Debug("purging success results metadata", "score", score) + if err := r.conn.ZRemRangeByScore(context.Background(), resultPrefix+success, "0", score).Err(); err != nil { + r.lo.Error("could not expire success/failed metadata", "err", err) + } + } + } +} + func (r *Results) NilError() error { return redis.Nil } diff --git a/server.go b/server.go index 0e45c3a..e875d9d 100644 --- a/server.go +++ b/server.go @@ -64,7 +64,7 @@ type TaskOpts struct { // RegisterTask maps a new task against the tasks map on the server. // It accepts different options for the task (to set callbacks). func (s *Server) RegisterTask(name string, fn handler, opts TaskOpts) error { - s.log.Info("added handler", "name", name) + s.log.Debug("registered handler", "name", name, "options", opts) if opts.Queue == "" { opts.Queue = DefaultQueue @@ -247,14 +247,14 @@ func (s *Server) Start(ctx context.Context) { // consume() listens on the queue for task messages and passes the task to processor. func (s *Server) consume(ctx context.Context, work chan []byte, queue string) { - s.log.Info("starting task consumer..") + s.log.Debug("starting task consumer..") s.broker.Consume(ctx, work, queue) } // process() listens on the work channel for tasks. On receiving a task it checks the // processors map and passes payload to relevant processor. func (s *Server) process(ctx context.Context, w chan []byte) { - s.log.Info("starting processor..") + s.log.Debug("starting processor..") for { var span spans.Span if s.traceProv != nil {