Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add expiry on result metadata #37

Merged
merged 2 commits into from
Sep 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions examples/redis/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ func main() {
DB: 0,
}, lo),
Results: rr.New(rr.Options{
Addrs: []string{"127.0.0.1:6379"},
Password: "",
DB: 0,
Addrs: []string{"127.0.0.1:6379"},
Password: "",
DB: 0,
MetaExpiry: time.Second * 5,
}, lo),
Logger: lo,
})
Expand All @@ -41,28 +42,33 @@ func main() {
log.Fatal(err)
}

var group []tasqueue.Job
go srv.Start(ctx)

for i := 0; i < 3; i++ {
b, _ := json.Marshal(tasks.SumPayload{Arg1: i, Arg2: 4})
task, err := tasqueue.NewJob("add", b, tasqueue.JobOpts{})
if err != nil {
log.Fatal(err)
}
group = append(group, task)
b, _ := json.Marshal(tasks.SumPayload{Arg1: 5, Arg2: 4})
task, err := tasqueue.NewJob("add", b, tasqueue.JobOpts{})
if err != nil {
log.Fatal(err)
}
srv.Enqueue(ctx, task)

t, _ := tasqueue.NewGroup(group, tasqueue.GroupOpts{})
x, _ := srv.EnqueueGroup(ctx, t)
go func() {
for {
select {
case <-time.Tick(time.Second * 1):
fmt.Println(srv.GetGroup(ctx, x))
b, _ = json.Marshal(tasks.SumPayload{Arg1: 5, Arg2: 4})
task, err = tasqueue.NewJob("add", b, tasqueue.JobOpts{})
if err != nil {
log.Fatal(err)
}
srv.Enqueue(ctx, task)
fmt.Println("exit..")
for {
select {
case <-time.NewTicker(time.Second * 1).C:
ids, err := srv.GetSuccess(ctx)
if err != nil {
log.Fatal(err)
}

log.Println(ids)
}
}()
srv.Start(ctx)
}

// Create a task payload.
fmt.Println("exit..")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.18

require (
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-redis/redis/v8 v8.11.5
github.com/go-redis/redis/v8 v8.11.5
github.com/google/uuid v1.3.0
github.com/nats-io/nats.go v1.15.0
github.com/robfig/cron/v3 v3.0.1
Expand Down
2 changes: 1 addition & 1 deletion groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s *Server) EnqueueGroup(ctx context.Context, t Group) (string, error) {
func (s *Server) GetGroup(ctx context.Context, id string) (GroupMessage, error) {
g, err := s.getGroupMessage(ctx, id)
if err != nil {
return g, nil
return g, err
}
// If the group status is either "done" or "failed".
// Do an early return
Expand Down
4 changes: 2 additions & 2 deletions jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestSaveJob(t *testing.T) {
}

// Wait for task to be consumed & processed.
time.Sleep(time.Second)
time.Sleep(time.Second * 2)

results, err := srv.GetResult(ctx, uuid)
if err != nil {
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestDeleteJob(t *testing.T) {
}

if string(results) != savedData {
t.Fatalf("saved results don't match results fetched.\nsaved:%v\nfetched:%v", savedData, results)
t.Fatalf("saved results don't match results fetched.\nsaved:%v\nfetched:%v", savedData, string(results))
}

if err := srv.DeleteJob(ctx, uuid); err != nil {
Expand Down
95 changes: 69 additions & 26 deletions results/redis/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package redis

import (
"context"
"strconv"
"time"

"github.com/go-redis/redis/v8"
"github.com/zerodha/logf"
)

const (
defaultExpiry = 0
resultPrefix = "tasqueue:results:"
resultPrefix = "tq:res:"

// Suffix for hashmaps storing success/failed job ids
success = "success"
Expand All @@ -31,6 +31,8 @@ type Options struct {
ReadTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
Expiry time.Duration
MetaExpiry time.Duration
MinIdleConns int
}

Expand All @@ -43,7 +45,7 @@ func DefaultRedis() Options {
}

func New(o Options, lo logf.Logger) *Results {
return &Results{
rs := &Results{
opt: o,
conn: redis.NewUniversalClient(
&redis.UniversalOptions{
Expand All @@ -59,16 +61,13 @@ func New(o Options, lo logf.Logger) *Results {
),
lo: lo,
}
}

func (r *Results) GetSuccess(ctx context.Context) ([]string, error) {
r.lo.Debug("getting successful jobs")
rs, err := r.conn.LRange(ctx, resultPrefix+success, 0, -1).Result()
if err != nil {
return nil, err
// TODO: pass ctx here somehow
if o.MetaExpiry != 0 {
go rs.expireMeta(o.MetaExpiry)
}

return rs, nil
return rs
}

func (r *Results) DeleteJob(ctx context.Context, id string) error {
Expand All @@ -88,39 +87,53 @@ func (r *Results) DeleteJob(ctx context.Context, id string) error {
return nil
}

func (r *Results) GetFailed(ctx context.Context) ([]string, error) {
r.lo.Debug("getting failed jobs")
rs, err := r.conn.LRange(ctx, resultPrefix+failed, 0, -1).Result()
func (r *Results) GetSuccess(ctx context.Context) ([]string, error) {
// Fetch the failed tasks with score less than current time
r.lo.Debug("getting successful jobs")
rs, err := r.conn.ZRevRangeByScore(ctx, resultPrefix+success, &redis.ZRangeBy{
Min: "0",
Max: strconv.FormatInt(time.Now().UnixNano(), 10),
}).Result()
if err != nil {
return nil, err
}

return rs, nil
}

func (r *Results) SetSuccess(ctx context.Context, id string) error {
r.lo.Debug("setting job as successful")
_, err := r.conn.RPush(ctx, resultPrefix+success, id).Result()
func (r *Results) GetFailed(ctx context.Context) ([]string, error) {
// Fetch the failed tasks with score less than current time
r.lo.Debug("getting failed jobs")
rs, err := r.conn.ZRevRangeByScore(ctx, resultPrefix+failed, &redis.ZRangeBy{
Min: "0",
Max: strconv.FormatInt(time.Now().UnixNano(), 10),
}).Result()
if err != nil {
return err
return nil, err
}

return nil
return rs, nil
}

func (r *Results) SetFailed(ctx context.Context, id string) error {
r.lo.Debug("setting job as failed")
_, err := r.conn.RPush(ctx, resultPrefix+failed, id).Result()
if err != nil {
return err
}
func (r *Results) SetSuccess(ctx context.Context, id string) error {
r.lo.Debug("setting job as successful", "id", id)
return r.conn.ZAdd(ctx, resultPrefix+success, &redis.Z{
Score: float64(time.Now().UnixNano()),
Member: id,
}).Err()
}

return nil
func (r *Results) SetFailed(ctx context.Context, id string) error {
r.lo.Debug("setting job as failed", "id", id)
return r.conn.ZAdd(ctx, resultPrefix+failed, &redis.Z{
Score: float64(time.Now().UnixNano()),
Member: id,
}).Err()
}

func (r *Results) Set(ctx context.Context, id string, b []byte) error {
r.lo.Debug("setting result for job", "id", id)
return r.conn.Set(ctx, resultPrefix+id, b, defaultExpiry).Err()
return r.conn.Set(ctx, resultPrefix+id, b, r.opt.Expiry).Err()
}

func (r *Results) Get(ctx context.Context, id string) ([]byte, error) {
Expand All @@ -134,6 +147,36 @@ func (r *Results) Get(ctx context.Context, id string) ([]byte, error) {
return rs, nil
}

// TODO: accpet a ctx here and shutdown gracefully
func (r *Results) expireMeta(ttl time.Duration) {
r.lo.Info("starting results meta purger", "ttl", ttl)

var (
tk = time.NewTicker(ttl)
)

for {
select {
// case <-ctx.Done():
// r.lo.Info("shutting down meta purger", "ttl", ttl)
// return
case <-tk.C:
now := time.Now().UnixNano() - int64(ttl)
score := strconv.FormatInt(now, 10)

r.lo.Debug("purging failed results metadata", "score", score)
if err := r.conn.ZRemRangeByScore(context.Background(), resultPrefix+failed, "0", score).Err(); err != nil {
r.lo.Error("could not expire success/failed metadata", "err", err)
}

r.lo.Debug("purging success results metadata", "score", score)
if err := r.conn.ZRemRangeByScore(context.Background(), resultPrefix+success, "0", score).Err(); err != nil {
r.lo.Error("could not expire success/failed metadata", "err", err)
}
}
}
}

func (r *Results) NilError() error {
return redis.Nil
}
6 changes: 3 additions & 3 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type TaskOpts struct {
// RegisterTask maps a new task against the tasks map on the server.
// It accepts different options for the task (to set callbacks).
func (s *Server) RegisterTask(name string, fn handler, opts TaskOpts) error {
s.log.Info("added handler", "name", name)
s.log.Debug("registered handler", "name", name, "options", opts)

if opts.Queue == "" {
opts.Queue = DefaultQueue
Expand Down Expand Up @@ -247,14 +247,14 @@ func (s *Server) Start(ctx context.Context) {

// consume() listens on the queue for task messages and passes the task to processor.
func (s *Server) consume(ctx context.Context, work chan []byte, queue string) {
s.log.Info("starting task consumer..")
s.log.Debug("starting task consumer..")
s.broker.Consume(ctx, work, queue)
}

// process() listens on the work channel for tasks. On receiving a task it checks the
// processors map and passes payload to relevant processor.
func (s *Server) process(ctx context.Context, w chan []byte) {
s.log.Info("starting processor..")
s.log.Debug("starting processor..")
for {
var span spans.Span
if s.traceProv != nil {
Expand Down
Loading